Forráskód Böngészése

HDFS-2856. Fix block protocol so that Datanodes don't require root or jsvc. Contributed by Chris Nauroth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1610474 13f79535-47bb-0310-9956-ffa450edef68
Chris Nauroth 11 éve
szülő
commit
3b54223c0f
30 módosított fájl, 2013 hozzáadás és 725 törlés
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 5 0
      hadoop-hdfs-project/hadoop-hdfs/pom.xml
  3. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
  4. 40 68
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  5. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  6. 8 17
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  7. 8 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java
  8. 1 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java
  9. 12 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java
  10. 0 506
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java
  11. 38 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java
  12. 267 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
  13. 44 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/InvalidMagicNumberException.java
  14. 439 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
  15. 381 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
  16. 166 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java
  17. 21 12
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
  18. 6 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
  19. 45 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
  20. 94 27
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  21. 18 58
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  22. 29 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  23. 31 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  24. 6 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
  25. 33 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
  26. 110 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferTestCase.java
  27. 155 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java
  28. 41 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java
  29. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
  30. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -284,6 +284,9 @@ Release 2.6.0 - UNRELEASED
 
     HDFS-3851. DFSOutputStream class code cleanup. (Jing Zhao via suresh)
 
+    HDFS-2856. Fix block protocol so that Datanodes don't require root or jsvc.
+    (cnauroth)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/pom.xml

@@ -141,6 +141,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java

@@ -744,7 +744,8 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
       }
     }
     try {
-      Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress);
+      Peer peer = remotePeerFactory.newConnectedPeer(inetSocketAddress, token,
+        datanode);
       if (LOG.isTraceEnabled()) {
         LOG.trace("nextTcpPeer: created newConnectedPeer " + peer);
       }

+ 40 - 68
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
@@ -135,6 +137,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolIterator;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -152,16 +155,19 @@ import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -208,7 +214,8 @@ import com.google.common.net.InetAddresses;
  *
  ********************************************************/
 @InterfaceAudience.Private
-public class DFSClient implements java.io.Closeable, RemotePeerFactory {
+public class DFSClient implements java.io.Closeable, RemotePeerFactory,
+    DataEncryptionKeyFactory {
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
   static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
@@ -232,7 +239,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
   private final Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
   private DataEncryptionKey encryptionKey;
-  final TrustedChannelResolver trustedChannelResolver;
+  final SaslDataTransferClient saslClient;
   private final CachingStrategy defaultReadCachingStrategy;
   private final CachingStrategy defaultWriteCachingStrategy;
   private final ClientContext clientContext;
@@ -634,7 +641,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
     if (numThreads > 0) {
       this.initThreadsNumForHedgedReads(numThreads);
     }
-    this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConfiguration());
+    this.saslClient = new SaslDataTransferClient(
+      DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+      TrustedChannelResolver.getInstance(conf),
+      conf.getBoolean(
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
   }
   
   /**
@@ -1804,23 +1816,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
                                      UnresolvedPathException.class);
      }
    }
-
-  /**
-   * Get the checksum of the whole file of a range of the file. Note that the
-   * range always starts from the beginning of the file.
-   * @param src The file path
-   * @param length The length of the range
-   * @return The checksum 
-   * @see DistributedFileSystem#getFileChecksum(Path)
-   */
-  public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
-      throws IOException {
-    checkOpen();
-    Preconditions.checkArgument(length >= 0);
-    return getFileChecksum(src, length, clientName, namenode,
-        socketFactory, dfsClientConf.socketTimeout, getDataEncryptionKey(),
-        dfsClientConf.connectToDnViaHostname);
-  }
   
   @InterfaceAudience.Private
   public void clearDataEncryptionKey() {
@@ -1840,11 +1835,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
     return d == null ? false : d.getEncryptDataTransfer();
   }
   
-  @InterfaceAudience.Private
-  public DataEncryptionKey getDataEncryptionKey()
-      throws IOException {
-    if (shouldEncryptData() && 
-        !this.trustedChannelResolver.isTrusted()) {
+  @Override
+  public DataEncryptionKey newDataEncryptionKey() throws IOException {
+    if (shouldEncryptData()) {
       synchronized (this) {
         if (encryptionKey == null ||
             encryptionKey.expiryDate < Time.now()) {
@@ -1859,22 +1852,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
   }
 
   /**
-   * Get the checksum of the whole file or a range of the file.
+   * Get the checksum of the whole file of a range of the file. Note that the
+   * range always starts from the beginning of the file.
    * @param src The file path
    * @param length the length of the range, i.e., the range is [0, length]
-   * @param clientName the name of the client requesting the checksum.
-   * @param namenode the RPC proxy for the namenode
-   * @param socketFactory to create sockets to connect to DNs
-   * @param socketTimeout timeout to use when connecting and waiting for a response
-   * @param encryptionKey the key needed to communicate with DNs in this cluster
-   * @param connectToDnViaHostname whether the client should use hostnames instead of IPs
    * @return The checksum 
+   * @see DistributedFileSystem#getFileChecksum(Path)
    */
-  private static MD5MD5CRC32FileChecksum getFileChecksum(String src,
-      long length, String clientName, ClientProtocol namenode,
-      SocketFactory socketFactory, int socketTimeout,
-      DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
+  public MD5MD5CRC32FileChecksum getFileChecksum(String src, long length)
       throws IOException {
+    checkOpen();
+    Preconditions.checkArgument(length >= 0);
     //get block locations for the file range
     LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0,
         length);
@@ -1909,7 +1897,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
       final DatanodeInfo[] datanodes = lb.getLocations();
       
       //try each datanode location of the block
-      final int timeout = 3000 * datanodes.length + socketTimeout;
+      final int timeout = 3000 * datanodes.length + dfsClientConf.socketTimeout;
       boolean done = false;
       for(int j = 0; !done && j < datanodes.length; j++) {
         DataOutputStream out = null;
@@ -1917,8 +1905,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
         
         try {
           //connect to a datanode
-          IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
-              encryptionKey, datanodes[j], timeout);
+          IOStreamPair pair = connectToDN(datanodes[j], timeout, lb);
           out = new DataOutputStream(new BufferedOutputStream(pair.out,
               HdfsConstants.SMALL_BUFFER_SIZE));
           in = new DataInputStream(pair.in);
@@ -1974,9 +1961,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
           } else {
             LOG.debug("Retrieving checksum from an earlier-version DataNode: " +
                       "inferring checksum by reading first byte");
-            ct = inferChecksumTypeByReading(
-                clientName, socketFactory, socketTimeout, lb, datanodes[j],
-                encryptionKey, connectToDnViaHostname);
+            ct = inferChecksumTypeByReading(lb, datanodes[j]);
           }
 
           if (i == 0) { // first block
@@ -2050,16 +2035,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
    * Connect to the given datanode's datantrasfer port, and return
    * the resulting IOStreamPair. This includes encryption wrapping, etc.
    */
-  private static IOStreamPair connectToDN(
-      SocketFactory socketFactory, boolean connectToDnViaHostname,
-      DataEncryptionKey encryptionKey, DatanodeInfo dn, int timeout)
-      throws IOException
-  {
+  private IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
+      LocatedBlock lb) throws IOException {
     boolean success = false;
     Socket sock = null;
     try {
       sock = socketFactory.createSocket();
-      String dnAddr = dn.getXferAddr(connectToDnViaHostname);
+      String dnAddr = dn.getXferAddr(getConf().connectToDnViaHostname);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Connecting to datanode " + dnAddr);
       }
@@ -2068,13 +2050,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
   
       OutputStream unbufOut = NetUtils.getOutputStream(sock);
       InputStream unbufIn = NetUtils.getInputStream(sock);
-      IOStreamPair ret;
-      if (encryptionKey != null) {
-        ret = DataTransferEncryptor.getEncryptedStreams(
-                unbufOut, unbufIn, encryptionKey);
-      } else {
-        ret = new IOStreamPair(unbufIn, unbufOut);        
-      }
+      IOStreamPair ret = saslClient.newSocketSend(sock, unbufOut, unbufIn, this,
+        lb.getBlockToken(), dn);
       success = true;
       return ret;
     } finally {
@@ -2090,21 +2067,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
    * with older HDFS versions which did not include the checksum type in
    * OpBlockChecksumResponseProto.
    *
-   * @param in input stream from datanode
-   * @param out output stream to datanode
    * @param lb the located block
-   * @param clientName the name of the DFSClient requesting the checksum
    * @param dn the connected datanode
    * @return the inferred checksum type
    * @throws IOException if an error occurs
    */
-  private static Type inferChecksumTypeByReading(
-      String clientName, SocketFactory socketFactory, int socketTimeout,
-      LocatedBlock lb, DatanodeInfo dn,
-      DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
+  private Type inferChecksumTypeByReading(LocatedBlock lb, DatanodeInfo dn)
       throws IOException {
-    IOStreamPair pair = connectToDN(socketFactory, connectToDnViaHostname,
-        encryptionKey, dn, socketTimeout);
+    IOStreamPair pair = connectToDN(dn, dfsClientConf.socketTimeout, lb);
 
     try {
       DataOutputStream out = new DataOutputStream(new BufferedOutputStream(pair.out,
@@ -2857,7 +2827,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
   }
 
   @Override // RemotePeerFactory
-  public Peer newConnectedPeer(InetSocketAddress addr) throws IOException {
+  public Peer newConnectedPeer(InetSocketAddress addr,
+      Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+      throws IOException {
     Peer peer = null;
     boolean success = false;
     Socket sock = null;
@@ -2866,8 +2838,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
       NetUtils.connect(sock, addr,
         getRandomLocalInterfaceAddr(),
         dfsClientConf.socketTimeout);
-      peer = TcpPeerServer.peerFromSocketAndKey(sock, 
-          getDataEncryptionKey());
+      peer = TcpPeerServer.peerFromSocketAndKey(saslClient, sock, this,
+          blockToken, datanodeId);
       success = true;
       return peer;
     } finally {

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -561,6 +561,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
   public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
   public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
+  public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
+  public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class";
   
   // Journal-node related configs. These are read on the JN side.
   public static final String  DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";

+ 8 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -61,7 +61,6 @@ import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
@@ -1047,14 +1046,10 @@ public class DFSOutputStream extends FSOutputSummer
         
         OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
         InputStream unbufIn = NetUtils.getInputStream(sock);
-        if (dfsClient.shouldEncryptData() && 
-            !dfsClient.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
-          IOStreamPair encryptedStreams =
-              DataTransferEncryptor.getEncryptedStreams(
-                  unbufOut, unbufIn, dfsClient.getDataEncryptionKey());
-          unbufOut = encryptedStreams.out;
-          unbufIn = encryptedStreams.in;
-        }
+        IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
+          unbufOut, unbufIn, dfsClient, blockToken, src);
+        unbufOut = saslStreams.out;
+        unbufIn = saslStreams.in;
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.SMALL_BUFFER_SIZE));
         in = new DataInputStream(unbufIn);
@@ -1325,14 +1320,10 @@ public class DFSOutputStream extends FSOutputSummer
           
           OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
           InputStream unbufIn = NetUtils.getInputStream(s);
-          if (dfsClient.shouldEncryptData()  && 
-              !dfsClient.trustedChannelResolver.isTrusted(s.getInetAddress())) {
-            IOStreamPair encryptedStreams =
-                DataTransferEncryptor.getEncryptedStreams(unbufOut,
-                    unbufIn, dfsClient.getDataEncryptionKey());
-            unbufOut = encryptedStreams.out;
-            unbufIn = encryptedStreams.in;
-          }
+          IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
+            unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
+          unbufOut = saslStreams.out;
+          unbufIn = saslStreams.in;
           out = new DataOutputStream(new BufferedOutputStream(unbufOut,
               HdfsConstants.SMALL_BUFFER_SIZE));
           blockReplyStream = new DataInputStream(unbufIn);

+ 8 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java

@@ -21,15 +21,21 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
 
 public interface RemotePeerFactory {
   /**
    * @param addr          The address to connect to.
-   * 
+   * @param blockToken    Token used during optional SASL negotiation
+   * @param datanodeId    ID of destination DataNode
    * @return              A new Peer connected to the address.
    *
    * @throws IOException  If there was an error connecting or creating 
    *                      the remote socket, encrypted stream, etc.
    */
-  Peer newConnectedPeer(InetSocketAddress addr) throws IOException;
+  Peer newConnectedPeer(InetSocketAddress addr,
+      Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+      throws IOException;
 }

+ 1 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java

@@ -19,9 +19,7 @@ package org.apache.hadoop.hdfs.net;
 
 import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.net.unix.DomainSocket;
 
 import java.io.InputStream;
@@ -51,11 +49,8 @@ public class EncryptedPeer implements Peer {
    */
   private final ReadableByteChannel channel;
 
-  public EncryptedPeer(Peer enclosedPeer, DataEncryptionKey key)
-      throws IOException {
+  public EncryptedPeer(Peer enclosedPeer, IOStreamPair ios) {
     this.enclosedPeer = enclosedPeer;
-    IOStreamPair ios = DataTransferEncryptor.getEncryptedStreams(
-        enclosedPeer.getOutputStream(), enclosedPeer.getInputStream(), key);
     this.in = ios.in;
     this.out = ios.out;
     this.channel = ios.in instanceof ReadableByteChannel ? 

+ 12 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java

@@ -28,10 +28,14 @@ import java.nio.channels.SocketChannel;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.token.Token;
 
 @InterfaceAudience.Private
 public class TcpPeerServer implements PeerServer {
@@ -74,15 +78,16 @@ public class TcpPeerServer implements PeerServer {
     }
   }
 
-  public static Peer peerFromSocketAndKey(Socket s,
-        DataEncryptionKey key) throws IOException {
+  public static Peer peerFromSocketAndKey(
+        SaslDataTransferClient saslClient, Socket s,
+        DataEncryptionKeyFactory keyFactory,
+        Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
+        throws IOException {
     Peer peer = null;
     boolean success = false;
     try {
-      peer = peerFromSocket(s); 
-      if (key != null) {
-        peer = new EncryptedPeer(peer, key);
-      }
+      peer = peerFromSocket(s);
+      peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId);
       success = true;
       return peer;
     } finally {

+ 0 - 506
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferEncryptor.java

@@ -1,506 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol.datatransfer;
-
-import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Map;
-import java.util.TreeMap;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.RealmChoiceCallback;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
-import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.security.SaslInputStream;
-import org.apache.hadoop.security.SaslOutputStream;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
-
-/**
- * A class which, given connected input/output streams, will perform a
- * handshake using those streams based on SASL to produce new Input/Output
- * streams which will encrypt/decrypt all data written/read from said streams.
- * Much of this is inspired by or borrowed from the TSaslTransport in Apache
- * Thrift, but with some HDFS-specific tweaks.
- */
-@InterfaceAudience.Private
-public class DataTransferEncryptor {
-  
-  public static final Log LOG = LogFactory.getLog(DataTransferEncryptor.class);
-  
-  /**
-   * Sent by clients and validated by servers. We use a number that's unlikely
-   * to ever be sent as the value of the DATA_TRANSFER_VERSION.
-   */
-  private static final int ENCRYPTED_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
-  
-  /**
-   * Delimiter for the three-part SASL username string.
-   */
-  private static final String NAME_DELIMITER = " ";
-  
-  // This has to be set as part of the SASL spec, but it don't matter for
-  // our purposes, but may not be empty. It's sent over the wire, so use
-  // a short string.
-  private static final String SERVER_NAME = "0";
-  
-  private static final String PROTOCOL = "hdfs";
-  private static final String MECHANISM = "DIGEST-MD5";
-  private static final Map<String, String> SASL_PROPS = new TreeMap<String, String>();
-  
-  static {
-    SASL_PROPS.put(Sasl.QOP, "auth-conf");
-    SASL_PROPS.put(Sasl.SERVER_AUTH, "true");
-  }
-  
-  /**
-   * Factory method for DNs, where the nonce, keyId, and encryption key are not
-   * yet known. The nonce and keyId will be sent by the client, and the DN
-   * will then use those pieces of info and the secret key shared with the NN
-   * to determine the encryptionKey used for the SASL handshake/encryption.
-   * 
-   * Establishes a secure connection assuming that the party on the other end
-   * has the same shared secret. This does a SASL connection handshake, but not
-   * a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with
-   * auth-conf enabled. In particular, it doesn't support an arbitrary number of
-   * challenge/response rounds, and we know that the client will never have an
-   * initial response, so we don't check for one.
-   *
-   * @param underlyingOut output stream to write to the other party
-   * @param underlyingIn input stream to read from the other party
-   * @param blockPoolTokenSecretManager secret manager capable of constructing
-   *        encryption key based on keyId, blockPoolId, and nonce
-   * @return a pair of streams which wrap the given streams and encrypt/decrypt
-   *         all data read/written
-   * @throws IOException in the event of error
-   */
-  public static IOStreamPair getEncryptedStreams(
-      OutputStream underlyingOut, InputStream underlyingIn,
-      BlockPoolTokenSecretManager blockPoolTokenSecretManager,
-      String encryptionAlgorithm) throws IOException {
-    
-    DataInputStream in = new DataInputStream(underlyingIn);
-    DataOutputStream out = new DataOutputStream(underlyingOut);
-    
-    Map<String, String> saslProps = Maps.newHashMap(SASL_PROPS);
-    saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Server using encryption algorithm " + encryptionAlgorithm);
-    }
-    
-    SaslParticipant sasl = new SaslParticipant(Sasl.createSaslServer(MECHANISM,
-        PROTOCOL, SERVER_NAME, saslProps,
-        new SaslServerCallbackHandler(blockPoolTokenSecretManager)));
-    
-    int magicNumber = in.readInt();
-    if (magicNumber != ENCRYPTED_TRANSFER_MAGIC_NUMBER) {
-      throw new InvalidMagicNumberException(magicNumber);
-    }
-    try {
-      // step 1
-      performSaslStep1(out, in, sasl);
-      
-      // step 2 (server-side only)
-      byte[] remoteResponse = readSaslMessage(in);
-      byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
-      sendSaslMessage(out, localResponse);
-      
-      // SASL handshake is complete
-      checkSaslComplete(sasl);
-      
-      return sasl.createEncryptedStreamPair(out, in);
-    } catch (IOException ioe) {
-      if (ioe instanceof SaslException &&
-          ioe.getCause() != null &&
-          ioe.getCause() instanceof InvalidEncryptionKeyException) {
-        // This could just be because the client is long-lived and hasn't gotten
-        // a new encryption key from the NN in a while. Upon receiving this
-        // error, the client will get a new encryption key from the NN and retry
-        // connecting to this DN.
-        sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage());
-      } else {
-        sendGenericSaslErrorMessage(out, ioe.getMessage());
-      }
-      throw ioe;
-    }
-  }
-  
-  /**
-   * Factory method for clients, where the encryption token is already created.
-   * 
-   * Establishes a secure connection assuming that the party on the other end
-   * has the same shared secret. This does a SASL connection handshake, but not
-   * a general-purpose one. It's specific to the MD5-DIGEST SASL mechanism with
-   * auth-conf enabled. In particular, it doesn't support an arbitrary number of
-   * challenge/response rounds, and we know that the client will never have an
-   * initial response, so we don't check for one.
-   *
-   * @param underlyingOut output stream to write to the other party
-   * @param underlyingIn input stream to read from the other party
-   * @param encryptionKey all info required to establish an encrypted stream
-   * @return a pair of streams which wrap the given streams and encrypt/decrypt
-   *         all data read/written
-   * @throws IOException in the event of error
-   */
-  public static IOStreamPair getEncryptedStreams(
-      OutputStream underlyingOut, InputStream underlyingIn,
-      DataEncryptionKey encryptionKey)
-          throws IOException {
-    
-    Map<String, String> saslProps = Maps.newHashMap(SASL_PROPS);
-    saslProps.put("com.sun.security.sasl.digest.cipher",
-        encryptionKey.encryptionAlgorithm);
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Client using encryption algorithm " +
-          encryptionKey.encryptionAlgorithm);
-    }
-    
-    DataOutputStream out = new DataOutputStream(underlyingOut);
-    DataInputStream in = new DataInputStream(underlyingIn);
-    
-    String userName = getUserNameFromEncryptionKey(encryptionKey);
-    SaslParticipant sasl = new SaslParticipant(Sasl.createSaslClient(
-        new String[] { MECHANISM }, userName, PROTOCOL, SERVER_NAME, saslProps,
-        new SaslClientCallbackHandler(encryptionKey.encryptionKey, userName)));
-    
-    out.writeInt(ENCRYPTED_TRANSFER_MAGIC_NUMBER);
-    out.flush();
-    
-    try {
-      // Start of handshake - "initial response" in SASL terminology.
-      sendSaslMessage(out, new byte[0]);
-      
-      // step 1
-      performSaslStep1(out, in, sasl);
-      
-      // step 2 (client-side only)
-      byte[] remoteResponse = readSaslMessage(in);
-      byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
-      assert localResponse == null;
-      
-      // SASL handshake is complete
-      checkSaslComplete(sasl);
-      
-      return sasl.createEncryptedStreamPair(out, in);
-    } catch (IOException ioe) {
-      sendGenericSaslErrorMessage(out, ioe.getMessage());
-      throw ioe;
-    }
-  }
-  
-  private static void performSaslStep1(DataOutputStream out, DataInputStream in,
-      SaslParticipant sasl) throws IOException {
-    byte[] remoteResponse = readSaslMessage(in);
-    byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
-    sendSaslMessage(out, localResponse);
-  }
-  
-  private static void checkSaslComplete(SaslParticipant sasl) throws IOException {
-    if (!sasl.isComplete()) {
-      throw new IOException("Failed to complete SASL handshake");
-    }
-    
-    if (!sasl.supportsConfidentiality()) {
-      throw new IOException("SASL handshake completed, but channel does not " +
-          "support encryption");
-    }
-  }
-  
-  private static void sendSaslMessage(DataOutputStream out, byte[] payload)
-      throws IOException {
-    sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
-  }
-  
-  private static void sendInvalidKeySaslErrorMessage(DataOutputStream out,
-      String message) throws IOException {
-    sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null,
-        message);
-  }
-  
-  private static void sendGenericSaslErrorMessage(DataOutputStream out,
-      String message) throws IOException {
-    sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message);
-  }
-  
-  private static void sendSaslMessage(OutputStream out,
-      DataTransferEncryptorStatus status, byte[] payload, String message)
-          throws IOException {
-    DataTransferEncryptorMessageProto.Builder builder =
-        DataTransferEncryptorMessageProto.newBuilder();
-    
-    builder.setStatus(status);
-    if (payload != null) {
-      builder.setPayload(ByteString.copyFrom(payload));
-    }
-    if (message != null) {
-      builder.setMessage(message);
-    }
-    
-    DataTransferEncryptorMessageProto proto = builder.build();
-    proto.writeDelimitedTo(out);
-    out.flush();
-  }
-  
-  private static byte[] readSaslMessage(DataInputStream in) throws IOException {
-    DataTransferEncryptorMessageProto proto =
-        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
-    if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
-      throw new InvalidEncryptionKeyException(proto.getMessage());
-    } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
-      throw new IOException(proto.getMessage());
-    } else {
-      return proto.getPayload().toByteArray();
-    }
-  }
-  
-  /**
-   * Set the encryption key when asked by the server-side SASL object.
-   */
-  private static class SaslServerCallbackHandler implements CallbackHandler {
-    
-    private final BlockPoolTokenSecretManager blockPoolTokenSecretManager;
-    
-    public SaslServerCallbackHandler(BlockPoolTokenSecretManager
-        blockPoolTokenSecretManager) {
-      this.blockPoolTokenSecretManager = blockPoolTokenSecretManager;
-    }
-
-    @Override
-    public void handle(Callback[] callbacks) throws IOException,
-        UnsupportedCallbackException {
-      NameCallback nc = null;
-      PasswordCallback pc = null;
-      AuthorizeCallback ac = null;
-      for (Callback callback : callbacks) {
-        if (callback instanceof AuthorizeCallback) {
-          ac = (AuthorizeCallback) callback;
-        } else if (callback instanceof PasswordCallback) {
-          pc = (PasswordCallback) callback;
-        } else if (callback instanceof NameCallback) {
-          nc = (NameCallback) callback;
-        } else if (callback instanceof RealmCallback) {
-          continue; // realm is ignored
-        } else {
-          throw new UnsupportedCallbackException(callback,
-              "Unrecognized SASL DIGEST-MD5 Callback: " + callback);
-        }
-      }
-      
-      if (pc != null) {
-        byte[] encryptionKey = getEncryptionKeyFromUserName(
-            blockPoolTokenSecretManager, nc.getDefaultName());
-        pc.setPassword(encryptionKeyToPassword(encryptionKey));
-      }
-      
-      if (ac != null) {
-        ac.setAuthorized(true);
-        ac.setAuthorizedID(ac.getAuthorizationID());
-      }
-      
-    }
-    
-  }
-  
-  /**
-   * Set the encryption key when asked by the client-side SASL object.
-   */
-  private static class SaslClientCallbackHandler implements CallbackHandler {
-    
-    private final byte[] encryptionKey;
-    private final String userName;
-    
-    public SaslClientCallbackHandler(byte[] encryptionKey, String userName) {
-      this.encryptionKey = encryptionKey;
-      this.userName = userName;
-    }
-
-    @Override
-    public void handle(Callback[] callbacks) throws IOException,
-        UnsupportedCallbackException {
-      NameCallback nc = null;
-      PasswordCallback pc = null;
-      RealmCallback rc = null;
-      for (Callback callback : callbacks) {
-        if (callback instanceof RealmChoiceCallback) {
-          continue;
-        } else if (callback instanceof NameCallback) {
-          nc = (NameCallback) callback;
-        } else if (callback instanceof PasswordCallback) {
-          pc = (PasswordCallback) callback;
-        } else if (callback instanceof RealmCallback) {
-          rc = (RealmCallback) callback;
-        } else {
-          throw new UnsupportedCallbackException(callback,
-              "Unrecognized SASL client callback");
-        }
-      }
-      if (nc != null) {
-        nc.setName(userName);
-      }
-      if (pc != null) {
-        pc.setPassword(encryptionKeyToPassword(encryptionKey));
-      }
-      if (rc != null) {
-        rc.setText(rc.getDefaultText());
-      }
-    }
-    
-  }
-  
-  /**
-   * The SASL username consists of the keyId, blockPoolId, and nonce with the
-   * first two encoded as Strings, and the third encoded using Base64. The
-   * fields are each separated by a single space.
-   * 
-   * @param encryptionKey the encryption key to encode as a SASL username.
-   * @return encoded username containing keyId, blockPoolId, and nonce
-   */
-  private static String getUserNameFromEncryptionKey(
-      DataEncryptionKey encryptionKey) {
-    return encryptionKey.keyId + NAME_DELIMITER +
-        encryptionKey.blockPoolId + NAME_DELIMITER +
-        new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
-  }
-  
-  /**
-   * Given a secret manager and a username encoded as described above, determine
-   * the encryption key.
-   * 
-   * @param blockPoolTokenSecretManager to determine the encryption key.
-   * @param userName containing the keyId, blockPoolId, and nonce.
-   * @return secret encryption key.
-   * @throws IOException
-   */
-  private static byte[] getEncryptionKeyFromUserName(
-      BlockPoolTokenSecretManager blockPoolTokenSecretManager, String userName)
-      throws IOException {
-    String[] nameComponents = userName.split(NAME_DELIMITER);
-    if (nameComponents.length != 3) {
-      throw new IOException("Provided name '" + userName + "' has " +
-          nameComponents.length + " components instead of the expected 3.");
-    }
-    int keyId = Integer.parseInt(nameComponents[0]);
-    String blockPoolId = nameComponents[1];
-    byte[] nonce = Base64.decodeBase64(nameComponents[2]);
-    return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId,
-        blockPoolId, nonce);
-  }
-  
-  private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
-    return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray();
-  }
-  
-  /**
-   * Strongly inspired by Thrift's TSaslTransport class.
-   * 
-   * Used to abstract over the <code>SaslServer</code> and
-   * <code>SaslClient</code> classes, which share a lot of their interface, but
-   * unfortunately don't share a common superclass.
-   */
-  private static class SaslParticipant {
-    // One of these will always be null.
-    public SaslServer saslServer;
-    public SaslClient saslClient;
-
-    public SaslParticipant(SaslServer saslServer) {
-      this.saslServer = saslServer;
-    }
-
-    public SaslParticipant(SaslClient saslClient) {
-      this.saslClient = saslClient;
-    }
-    
-    public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse) throws SaslException {
-      if (saslClient != null) {
-        return saslClient.evaluateChallenge(challengeOrResponse);
-      } else {
-        return saslServer.evaluateResponse(challengeOrResponse);
-      }
-    }
-
-    public boolean isComplete() {
-      if (saslClient != null)
-        return saslClient.isComplete();
-      else
-        return saslServer.isComplete();
-    }
-    
-    public boolean supportsConfidentiality() {
-      String qop = null;
-      if (saslClient != null) {
-        qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
-      } else {
-        qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
-      }
-      return qop != null && qop.equals("auth-conf");
-    }
-    
-    // Return some input/output streams that will henceforth have their
-    // communication encrypted.
-    private IOStreamPair createEncryptedStreamPair(
-        DataOutputStream out, DataInputStream in) {
-      if (saslClient != null) {
-        return new IOStreamPair(
-            new SaslInputStream(in, saslClient),
-            new SaslOutputStream(out, saslClient));
-      } else {
-        return new IOStreamPair(
-            new SaslInputStream(in, saslServer),
-            new SaslOutputStream(out, saslServer));
-      }
-    }
-  }
-  
-  @InterfaceAudience.Private
-  public static class InvalidMagicNumberException extends IOException {
-    
-    private static final long serialVersionUID = 1L;
-
-    public InvalidMagicNumberException(int magicNumber) {
-      super(String.format("Received %x instead of %x from client.",
-          magicNumber, ENCRYPTED_TRANSFER_MAGIC_NUMBER));
-    }
-  }
-  
-}

+ 38 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataEncryptionKeyFactory.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+
+/**
+ * Creates a new {@link DataEncryptionKey} on demand.
+ */
+@InterfaceAudience.Private
+public interface DataEncryptionKeyFactory {
+
+  /**
+   * Creates a new DataEncryptionKey.
+   *
+   * @return DataEncryptionKey newly created
+   * @throws IOException for any error
+   */
+  DataEncryptionKey newDataEncryptionKey() throws IOException;
+}

+ 267 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java

@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY;
+import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import javax.security.sasl.Sasl;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.net.InetAddresses;
+import com.google.protobuf.ByteString;
+
+/**
+ * Utility methods implementing SASL negotiation for DataTransferProtocol.
+ */
+@InterfaceAudience.Private
+public final class DataTransferSaslUtil {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+    DataTransferSaslUtil.class);
+
+  /**
+   * Delimiter for the three-part SASL username string.
+   */
+  public static final String NAME_DELIMITER = " ";
+
+  /**
+   * Sent by clients and validated by servers. We use a number that's unlikely
+   * to ever be sent as the value of the DATA_TRANSFER_VERSION.
+   */
+  public static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
+
+  /**
+   * Checks that SASL negotiation has completed for the given participant, and
+   * the negotiated quality of protection is included in the given SASL
+   * properties and therefore acceptable.
+   *
+   * @param sasl participant to check
+   * @param saslProps properties of SASL negotiation
+   * @throws IOException for any error
+   */
+  public static void checkSaslComplete(SaslParticipant sasl,
+      Map<String, String> saslProps) throws IOException {
+    if (!sasl.isComplete()) {
+      throw new IOException("Failed to complete SASL handshake");
+    }
+    Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList(
+      saslProps.get(Sasl.QOP).split(",")));
+    String negotiatedQop = sasl.getNegotiatedQop();
+    LOG.debug("Verifying QOP, requested QOP = {}, negotiated QOP = {}",
+      requestedQop, negotiatedQop);
+    if (!requestedQop.contains(negotiatedQop)) {
+      throw new IOException(String.format("SASL handshake completed, but " +
+        "channel does not have acceptable quality of protection, " +
+        "requested = %s, negotiated = %s", requestedQop, negotiatedQop));
+    }
+  }
+
+  /**
+   * Creates SASL properties required for an encrypted SASL negotiation.
+   *
+   * @param encryptionAlgorithm to use for SASL negotation
+   * @return properties of encrypted SASL negotiation
+   */
+  public static Map<String, String> createSaslPropertiesForEncryption(
+      String encryptionAlgorithm) {
+    Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
+    saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+    saslProps.put(Sasl.SERVER_AUTH, "true");
+    saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
+    return saslProps;
+  }
+
+  /**
+   * For an encrypted SASL negotiation, encodes an encryption key to a SASL
+   * password.
+   *
+   * @param encryptionKey to encode
+   * @return key encoded as SASL password
+   */
+  public static char[] encryptionKeyToPassword(byte[] encryptionKey) {
+    return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8)
+      .toCharArray();
+  }
+
+  /**
+   * Returns InetAddress from peer.  The getRemoteAddressString has the form
+   * [host][/ip-address]:port.  The host may be missing.  The IP address (and
+   * preceding '/') may be missing.  The port preceded by ':' is always present.
+   *
+   * @param peer
+   * @return InetAddress from peer
+   */
+  public static InetAddress getPeerAddress(Peer peer) {
+    String remoteAddr = peer.getRemoteAddressString().split(":")[0];
+    int slashIdx = remoteAddr.indexOf('/');
+    return InetAddresses.forString(slashIdx != -1 ?
+        remoteAddr.substring(slashIdx + 1, remoteAddr.length()) :
+        remoteAddr);
+  }
+
+  /**
+   * Creates a SaslPropertiesResolver from the given configuration.  This method
+   * works by cloning the configuration, translating configuration properties
+   * specific to DataTransferProtocol to what SaslPropertiesResolver expects,
+   * and then delegating to SaslPropertiesResolver for initialization.  This
+   * method returns null if SASL protection has not been configured for
+   * DataTransferProtocol.
+   *
+   * @param conf configuration to read
+   * @return SaslPropertiesResolver for DataTransferProtocol, or null if not
+   *   configured
+   */
+  public static SaslPropertiesResolver getSaslPropertiesResolver(
+      Configuration conf) {
+    String qops = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
+    if (qops == null || qops.isEmpty()) {
+      LOG.debug("DataTransferProtocol not using SaslPropertiesResolver, no " +
+        "QOP found in configuration for {}", DFS_DATA_TRANSFER_PROTECTION_KEY);
+      return null;
+    }
+    Configuration saslPropsResolverConf = new Configuration(conf);
+    saslPropsResolverConf.set(HADOOP_RPC_PROTECTION, qops);
+    Class<? extends SaslPropertiesResolver> resolverClass = conf.getClass(
+      DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY,
+      SaslPropertiesResolver.class, SaslPropertiesResolver.class);
+    saslPropsResolverConf.setClass(HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
+      resolverClass, SaslPropertiesResolver.class);
+    SaslPropertiesResolver resolver = SaslPropertiesResolver.getInstance(
+      saslPropsResolverConf);
+    LOG.debug("DataTransferProtocol using SaslPropertiesResolver, configured " +
+      "QOP {} = {}, configured class {} = {}", DFS_DATA_TRANSFER_PROTECTION_KEY, qops, 
+      DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, resolverClass);
+    return resolver;
+  }
+
+  /**
+   * Performs the first step of SASL negotiation.
+   *
+   * @param out connection output stream
+   * @param in connection input stream
+   * @param sasl participant
+   */
+  public static void performSaslStep1(OutputStream out, InputStream in,
+      SaslParticipant sasl) throws IOException {
+    byte[] remoteResponse = readSaslMessage(in);
+    byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+    sendSaslMessage(out, localResponse);
+  }
+
+  /**
+   * Reads a SASL negotiation message.
+   *
+   * @param in stream to read
+   * @return bytes of SASL negotiation messsage
+   * @throws IOException for any error
+   */
+  public static byte[] readSaslMessage(InputStream in) throws IOException {
+    DataTransferEncryptorMessageProto proto =
+        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+    if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+      throw new InvalidEncryptionKeyException(proto.getMessage());
+    } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+      throw new IOException(proto.getMessage());
+    } else {
+      return proto.getPayload().toByteArray();
+    }
+  }
+
+  /**
+   * Sends a SASL negotiation message indicating an error.
+   *
+   * @param out stream to receive message
+   * @param message to send
+   * @throws IOException for any error
+   */
+  public static void sendGenericSaslErrorMessage(OutputStream out,
+      String message) throws IOException {
+    sendSaslMessage(out, DataTransferEncryptorStatus.ERROR, null, message);
+  }
+
+  /**
+   * Sends a SASL negotiation message.
+   *
+   * @param out stream to receive message
+   * @param payload to send
+   * @throws IOException for any error
+   */
+  public static void sendSaslMessage(OutputStream out, byte[] payload)
+      throws IOException {
+    sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
+  }
+
+  /**
+   * Sends a SASL negotiation message.
+   *
+   * @param out stream to receive message
+   * @param status negotiation status
+   * @param payload to send
+   * @param message to send
+   * @throws IOException for any error
+   */
+  public static void sendSaslMessage(OutputStream out,
+      DataTransferEncryptorStatus status, byte[] payload, String message)
+          throws IOException {
+    DataTransferEncryptorMessageProto.Builder builder =
+        DataTransferEncryptorMessageProto.newBuilder();
+    
+    builder.setStatus(status);
+    if (payload != null) {
+      builder.setPayload(ByteString.copyFrom(payload));
+    }
+    if (message != null) {
+      builder.setMessage(message);
+    }
+    
+    DataTransferEncryptorMessageProto proto = builder.build();
+    proto.writeDelimitedTo(out);
+    out.flush();
+  }
+
+  /**
+   * There is no reason to instantiate this class.
+   */
+  private DataTransferSaslUtil() {
+  }
+}

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/InvalidMagicNumberException.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.SASL_TRANSFER_MAGIC_NUMBER;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Indicates that SASL protocol negotiation expected to read a pre-defined magic
+ * number, but the expected value was not seen.
+ */
+@InterfaceAudience.Private
+public class InvalidMagicNumberException extends IOException {
+
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * Creates a new InvalidMagicNumberException.
+   *
+   * @param magicNumber expected value
+   */
+  public InvalidMagicNumberException(int magicNumber) {
+    super(String.format("Received %x instead of %x from client.",
+        magicNumber, SASL_TRANSFER_MAGIC_NUMBER));
+  }
+}

+ 439 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java

@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.net.EncryptedPeer;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Negotiates SASL for DataTransferProtocol on behalf of a client.  There are
+ * two possible supported variants of SASL negotiation: either a general-purpose
+ * negotiation supporting any quality of protection, or a specialized
+ * negotiation that enforces privacy as the quality of protection using a
+ * cryptographically strong encryption key.
+ *
+ * This class is used in both the HDFS client and the DataNode.  The DataNode
+ * needs it, because it acts as a client to other DataNodes during write
+ * pipelines and block transfers.
+ */
+@InterfaceAudience.Private
+public class SaslDataTransferClient {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+    SaslDataTransferClient.class);
+
+  private final boolean fallbackToSimpleAuthAllowed;
+  private final SaslPropertiesResolver saslPropsResolver;
+  private final TrustedChannelResolver trustedChannelResolver;
+
+  /**
+   * Creates a new SaslDataTransferClient.
+   *
+   * @param saslPropsResolver for determining properties of SASL negotiation
+   * @param trustedChannelResolver for identifying trusted connections that do
+   *   not require SASL negotiation
+   */
+  public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
+      TrustedChannelResolver trustedChannelResolver,
+      boolean fallbackToSimpleAuthAllowed) {
+    this.fallbackToSimpleAuthAllowed = fallbackToSimpleAuthAllowed;
+    this.saslPropsResolver = saslPropsResolver;
+    this.trustedChannelResolver = trustedChannelResolver;
+  }
+
+  /**
+   * Sends client SASL negotiation for a newly allocated socket if required.
+   *
+   * @param socket connection socket
+   * @param underlyingOut connection output stream
+   * @param underlyingIn connection input stream
+   * @param encryptionKeyFactory for creation of an encryption key
+   * @param accessToken connection block access token
+   * @param datanodeId ID of destination DataNode
+   * @return new pair of streams, wrapped after SASL negotiation
+   * @throws IOException for any error
+   */
+  public IOStreamPair newSocketSend(Socket socket, OutputStream underlyingOut,
+      InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
+      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+      throws IOException {
+    // The encryption key factory only returns a key if encryption is enabled.
+    DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ?
+      encryptionKeyFactory.newDataEncryptionKey() : null;
+    IOStreamPair ios = send(socket.getInetAddress(), underlyingOut,
+      underlyingIn, encryptionKey, accessToken, datanodeId);
+    return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
+  }
+
+  /**
+   * Sends client SASL negotiation for a peer if required.
+   *
+   * @param peer connection peer
+   * @param encryptionKeyFactory for creation of an encryption key
+   * @param accessToken connection block access token
+   * @param datanodeId ID of destination DataNode
+   * @return new pair of streams, wrapped after SASL negotiation
+   * @throws IOException for any error
+   */
+  public Peer peerSend(Peer peer, DataEncryptionKeyFactory encryptionKeyFactory,
+      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+      throws IOException {
+    IOStreamPair ios = checkTrustAndSend(getPeerAddress(peer),
+      peer.getOutputStream(), peer.getInputStream(), encryptionKeyFactory,
+      accessToken, datanodeId);
+    // TODO: Consider renaming EncryptedPeer to SaslPeer.
+    return ios != null ? new EncryptedPeer(peer, ios) : peer;
+  }
+
+  /**
+   * Sends client SASL negotiation for a socket if required.
+   *
+   * @param socket connection socket
+   * @param underlyingOut connection output stream
+   * @param underlyingIn connection input stream
+   * @param encryptionKeyFactory for creation of an encryption key
+   * @param accessToken connection block access token
+   * @param datanodeId ID of destination DataNode
+   * @return new pair of streams, wrapped after SASL negotiation
+   * @throws IOException for any error
+   */
+  public IOStreamPair socketSend(Socket socket, OutputStream underlyingOut,
+      InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
+      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+      throws IOException {
+    IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut,
+      underlyingIn, encryptionKeyFactory, accessToken, datanodeId);
+    return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
+  }
+
+  /**
+   * Checks if an address is already trusted and then sends client SASL
+   * negotiation if required.
+   *
+   * @param addr connection address
+   * @param underlyingOut connection output stream
+   * @param underlyingIn connection input stream
+   * @param encryptionKeyFactory for creation of an encryption key
+   * @param accessToken connection block access token
+   * @param datanodeId ID of destination DataNode
+   * @return new pair of streams, wrapped after SASL negotiation
+   * @throws IOException for any error
+   */
+  private IOStreamPair checkTrustAndSend(InetAddress addr,
+      OutputStream underlyingOut, InputStream underlyingIn,
+      DataEncryptionKeyFactory encryptionKeyFactory,
+      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+      throws IOException {
+    if (!trustedChannelResolver.isTrusted() &&
+        !trustedChannelResolver.isTrusted(addr)) {
+      // The encryption key factory only returns a key if encryption is enabled.
+      DataEncryptionKey encryptionKey =
+        encryptionKeyFactory.newDataEncryptionKey();
+      return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken,
+        datanodeId);
+    } else {
+      LOG.debug(
+        "SASL client skipping handshake on trusted connection for addr = {}, "
+        + "datanodeId = {}", addr, datanodeId);
+      return null;
+    }
+  }
+
+  /**
+   * Sends client SASL negotiation if required.  Determines the correct type of
+   * SASL handshake based on configuration.
+   *
+   * @param addr connection address
+   * @param underlyingOut connection output stream
+   * @param underlyingIn connection input stream
+   * @param encryptionKey for an encrypted SASL handshake
+   * @param accessToken connection block access token
+   * @param datanodeId ID of destination DataNode
+   * @return new pair of streams, wrapped after SASL negotiation
+   * @throws IOException for any error
+   */
+  private IOStreamPair send(InetAddress addr, OutputStream underlyingOut,
+      InputStream underlyingIn, DataEncryptionKey encryptionKey,
+      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+      throws IOException {
+    if (encryptionKey != null) {
+      LOG.debug(
+        "SASL client doing encrypted handshake for addr = {}, datanodeId = {}",
+        addr, datanodeId);
+      return getEncryptedStreams(underlyingOut, underlyingIn,
+        encryptionKey);
+    } else if (!UserGroupInformation.isSecurityEnabled()) {
+      LOG.debug(
+        "SASL client skipping handshake in unsecured configuration for "
+        + "addr = {}, datanodeId = {}", addr, datanodeId);
+      return null;
+    } else if (datanodeId.getXferPort() < 1024) {
+      LOG.debug(
+        "SASL client skipping handshake in secured configuration with "
+        + "privileged port for addr = {}, datanodeId = {}", addr, datanodeId);
+      return null;
+    } else if (accessToken.getIdentifier().length == 0) {
+      if (!fallbackToSimpleAuthAllowed) {
+        throw new IOException(
+          "No block access token was provided (insecure cluster), but this " +
+          "client is configured to allow only secure connections.");
+      }
+      LOG.debug(
+        "SASL client skipping handshake in secured configuration with "
+        + "unsecured cluster for addr = {}, datanodeId = {}", addr, datanodeId);
+      return null;
+    } else {
+      LOG.debug(
+        "SASL client doing general handshake for addr = {}, datanodeId = {}",
+        addr, datanodeId);
+      return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken,
+        datanodeId);
+    }
+  }
+
+  /**
+   * Sends client SASL negotiation for specialized encrypted handshake.
+   *
+   * @param underlyingOut connection output stream
+   * @param underlyingIn connection input stream
+   * @param encryptionKey for an encrypted SASL handshake
+   * @return new pair of streams, wrapped after SASL negotiation
+   * @throws IOException for any error
+   */
+  private IOStreamPair getEncryptedStreams(OutputStream underlyingOut,
+      InputStream underlyingIn, DataEncryptionKey encryptionKey)
+      throws IOException {
+    Map<String, String> saslProps = createSaslPropertiesForEncryption(
+      encryptionKey.encryptionAlgorithm);
+
+    LOG.debug("Client using encryption algorithm {}",
+      encryptionKey.encryptionAlgorithm);
+
+    String userName = getUserNameFromEncryptionKey(encryptionKey);
+    char[] password = encryptionKeyToPassword(encryptionKey.encryptionKey);
+    CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
+      password);
+    return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps,
+      callbackHandler);
+  }
+
+  /**
+   * The SASL username for an encrypted handshake consists of the keyId,
+   * blockPoolId, and nonce with the first two encoded as Strings, and the third
+   * encoded using Base64. The fields are each separated by a single space.
+   * 
+   * @param encryptionKey the encryption key to encode as a SASL username.
+   * @return encoded username containing keyId, blockPoolId, and nonce
+   */
+  private static String getUserNameFromEncryptionKey(
+      DataEncryptionKey encryptionKey) {
+    return encryptionKey.keyId + NAME_DELIMITER +
+        encryptionKey.blockPoolId + NAME_DELIMITER +
+        new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
+  }
+
+  /**
+   * Sets user name and password when asked by the client-side SASL object.
+   */
+  private static final class SaslClientCallbackHandler
+      implements CallbackHandler {
+
+    private final char[] password;
+    private final String userName;
+
+    /**
+     * Creates a new SaslClientCallbackHandler.
+     *
+     * @param userName SASL user name
+     * @Param password SASL password
+     */
+    public SaslClientCallbackHandler(String userName, char[] password) {
+      this.password = password;
+      this.userName = userName;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException,
+        UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      RealmCallback rc = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof RealmChoiceCallback) {
+          continue;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          rc = (RealmCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "Unrecognized SASL client callback");
+        }
+      }
+      if (nc != null) {
+        nc.setName(userName);
+      }
+      if (pc != null) {
+        pc.setPassword(password);
+      }
+      if (rc != null) {
+        rc.setText(rc.getDefaultText());
+      }
+    }
+  }
+
+  /**
+   * Sends client SASL negotiation for general-purpose handshake.
+   *
+   * @param addr connection address
+   * @param underlyingOut connection output stream
+   * @param underlyingIn connection input stream
+   * @param accessToken connection block access token
+   * @param datanodeId ID of destination DataNode
+   * @return new pair of streams, wrapped after SASL negotiation
+   * @throws IOException for any error
+   */
+  private IOStreamPair getSaslStreams(InetAddress addr,
+      OutputStream underlyingOut, InputStream underlyingIn,
+      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+      throws IOException {
+    if (saslPropsResolver == null) {
+      throw new IOException(String.format("Cannot create a secured " +
+        "connection if DataNode listens on unprivileged port (%d) and no " +
+        "protection is defined in configuration property %s.",
+        datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
+    }
+    Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr);
+
+    String userName = buildUserName(accessToken);
+    char[] password = buildClientPassword(accessToken);
+    CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
+      password);
+    return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps,
+      callbackHandler);
+  }
+
+  /**
+   * Builds the client's user name for the general-purpose handshake, consisting
+   * of the base64-encoded serialized block access token identifier.  Note that
+   * this includes only the token identifier, not the token itself, which would
+   * include the password.  The password is a shared secret, and we must not
+   * write it on the network during the SASL authentication exchange.
+   *
+   * @param blockToken for block access
+   * @return SASL user name
+   */
+  private static String buildUserName(Token<BlockTokenIdentifier> blockToken) {
+    return new String(Base64.encodeBase64(blockToken.getIdentifier(), false),
+      Charsets.UTF_8);
+  }
+
+  /**
+   * Calculates the password on the client side for the general-purpose
+   * handshake.  The password consists of the block access token's password.
+   *
+   * @param blockToken for block access
+   * @return SASL password
+   */    
+  private char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
+    return new String(Base64.encodeBase64(blockToken.getPassword(), false),
+      Charsets.UTF_8).toCharArray();
+  }
+
+  /**
+   * This method actually executes the client-side SASL handshake.
+   *
+   * @param underlyingOut connection output stream
+   * @param underlyingIn connection input stream
+   * @param userName SASL user name
+   * @param saslProps properties of SASL negotiation
+   * @param callbackHandler for responding to SASL callbacks
+   * @return new pair of streams, wrapped after SASL negotiation
+   * @throws IOException for any error
+   */
+  private IOStreamPair doSaslHandshake(OutputStream underlyingOut,
+      InputStream underlyingIn, String userName, Map<String, String> saslProps,
+      CallbackHandler callbackHandler) throws IOException {
+
+    DataOutputStream out = new DataOutputStream(underlyingOut);
+    DataInputStream in = new DataInputStream(underlyingIn);
+
+    SaslParticipant sasl= SaslParticipant.createClientSaslParticipant(userName,
+      saslProps, callbackHandler);
+
+    out.writeInt(SASL_TRANSFER_MAGIC_NUMBER);
+    out.flush();
+
+    try {
+      // Start of handshake - "initial response" in SASL terminology.
+      sendSaslMessage(out, new byte[0]);
+
+      // step 1
+      performSaslStep1(out, in, sasl);
+
+      // step 2 (client-side only)
+      byte[] remoteResponse = readSaslMessage(in);
+      byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+      assert localResponse == null;
+
+      // SASL handshake is complete
+      checkSaslComplete(sasl, saslProps);
+
+      return sasl.createStreamPair(out, in);
+    } catch (IOException ioe) {
+      sendGenericSaslErrorMessage(out, ioe.getMessage());
+      throw ioe;
+    }
+  }
+}

+ 381 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java

@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.SaslException;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Negotiates SASL for DataTransferProtocol on behalf of a server.  There are
+ * two possible supported variants of SASL negotiation: either a general-purpose
+ * negotiation supporting any quality of protection, or a specialized
+ * negotiation that enforces privacy as the quality of protection using a
+ * cryptographically strong encryption key.
+ *
+ * This class is used in the DataNode for handling inbound connections.
+ */
+@InterfaceAudience.Private
+public class SaslDataTransferServer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+    SaslDataTransferServer.class);
+
+  private final BlockPoolTokenSecretManager blockPoolTokenSecretManager;
+  private final DNConf dnConf;
+
+  /**
+   * Creates a new SaslDataTransferServer.
+   *
+   * @param dnConf configuration of DataNode
+   * @param blockPoolTokenSecretManager used for checking block access tokens
+   *   and encryption keys
+   */
+  public SaslDataTransferServer(DNConf dnConf,
+      BlockPoolTokenSecretManager blockPoolTokenSecretManager) {
+    this.blockPoolTokenSecretManager = blockPoolTokenSecretManager;
+    this.dnConf = dnConf;
+  }
+
+  /**
+   * Receives SASL negotiation from a peer on behalf of a server.
+   *
+   * @param peer connection peer
+   * @param underlyingOut connection output stream
+   * @param underlyingIn connection input stream
+   * @param datanodeId ID of DataNode accepting connection
+   * @return new pair of streams, wrapped after SASL negotiation
+   * @throws IOException for any error
+   */
+  public IOStreamPair receive(Peer peer, OutputStream underlyingOut,
+      InputStream underlyingIn, DatanodeID datanodeId) throws IOException {
+    if (dnConf.getEncryptDataTransfer()) {
+      LOG.debug(
+        "SASL server doing encrypted handshake for peer = {}, datanodeId = {}",
+        peer, datanodeId);
+      return getEncryptedStreams(peer, underlyingOut, underlyingIn);
+    } else if (!UserGroupInformation.isSecurityEnabled()) {
+      LOG.debug(
+        "SASL server skipping handshake in unsecured configuration for "
+        + "peer = {}, datanodeId = {}", peer, datanodeId);
+      return new IOStreamPair(underlyingIn, underlyingOut);
+    } else if (datanodeId.getXferPort() < 1024) {
+      LOG.debug(
+        "SASL server skipping handshake in unsecured configuration for "
+        + "peer = {}, datanodeId = {}", peer, datanodeId);
+      return new IOStreamPair(underlyingIn, underlyingOut);
+    } else {
+      LOG.debug(
+        "SASL server doing general handshake for peer = {}, datanodeId = {}",
+        peer, datanodeId);
+      return getSaslStreams(peer, underlyingOut, underlyingIn, datanodeId);
+    }
+  }
+
+  /**
+   * Receives SASL negotiation for specialized encrypted handshake.
+   *
+   * @param peer connection peer
+   * @param underlyingOut connection output stream
+   * @param underlyingIn connection input stream
+   * @return new pair of streams, wrapped after SASL negotiation
+   * @throws IOException for any error
+   */
+  private IOStreamPair getEncryptedStreams(Peer peer,
+      OutputStream underlyingOut, InputStream underlyingIn) throws IOException {
+    if (peer.hasSecureChannel() ||
+        dnConf.getTrustedChannelResolver().isTrusted(getPeerAddress(peer))) {
+      return new IOStreamPair(underlyingIn, underlyingOut);
+    }
+
+    Map<String, String> saslProps = createSaslPropertiesForEncryption(
+      dnConf.getEncryptionAlgorithm());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Server using encryption algorithm " +
+        dnConf.getEncryptionAlgorithm());
+    }
+
+    CallbackHandler callbackHandler = new SaslServerCallbackHandler(
+      new PasswordFunction() {
+        @Override
+        public char[] apply(String userName) throws IOException {
+          return encryptionKeyToPassword(getEncryptionKeyFromUserName(userName));
+        }
+      });
+    return doSaslHandshake(underlyingOut, underlyingIn, saslProps,
+        callbackHandler);
+  }
+
+  /**
+   * The SASL handshake for encrypted vs. general-purpose uses different logic
+   * for determining the password.  This interface is used to parameterize that
+   * logic.  It's similar to a Guava Function, but we need to let it throw
+   * exceptions.
+   */
+  private interface PasswordFunction {
+
+    /**
+     * Returns the SASL password for the given user name.
+     *
+     * @param userName SASL user name
+     * @return SASL password
+     * @throws IOException for any error
+     */
+    char[] apply(String userName) throws IOException;
+  }
+
+  /**
+   * Sets user name and password when asked by the server-side SASL object.
+   */
+  private static final class SaslServerCallbackHandler
+      implements CallbackHandler {
+
+    private final PasswordFunction passwordFunction;
+
+    /**
+     * Creates a new SaslServerCallbackHandler.
+     *
+     * @param passwordFunction for determing the user's password
+     */
+    public SaslServerCallbackHandler(PasswordFunction passwordFunction) {
+      this.passwordFunction = passwordFunction;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException,
+        UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      AuthorizeCallback ac = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof AuthorizeCallback) {
+          ac = (AuthorizeCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          continue; // realm is ignored
+        } else {
+          throw new UnsupportedCallbackException(callback,
+              "Unrecognized SASL DIGEST-MD5 Callback: " + callback);
+        }
+      }
+
+      if (pc != null) {
+        pc.setPassword(passwordFunction.apply(nc.getDefaultName()));
+      }
+
+      if (ac != null) {
+        ac.setAuthorized(true);
+        ac.setAuthorizedID(ac.getAuthorizationID());
+      }
+    }
+  }
+
+  /**
+   * Given a secret manager and a username encoded for the encrypted handshake,
+   * determine the encryption key.
+   * 
+   * @param userName containing the keyId, blockPoolId, and nonce.
+   * @return secret encryption key.
+   * @throws IOException
+   */
+  private byte[] getEncryptionKeyFromUserName(String userName)
+      throws IOException {
+    String[] nameComponents = userName.split(NAME_DELIMITER);
+    if (nameComponents.length != 3) {
+      throw new IOException("Provided name '" + userName + "' has " +
+          nameComponents.length + " components instead of the expected 3.");
+    }
+    int keyId = Integer.parseInt(nameComponents[0]);
+    String blockPoolId = nameComponents[1];
+    byte[] nonce = Base64.decodeBase64(nameComponents[2]);
+    return blockPoolTokenSecretManager.retrieveDataEncryptionKey(keyId,
+        blockPoolId, nonce);
+  }
+
+  /**
+   * Receives SASL negotiation for general-purpose handshake.
+   *
+   * @param peer connection peer
+   * @param underlyingOut connection output stream
+   * @param underlyingIn connection input stream
+   * @param datanodeId ID of DataNode accepting connection
+   * @return new pair of streams, wrapped after SASL negotiation
+   * @throws IOException for any error
+   */
+  private IOStreamPair getSaslStreams(Peer peer, OutputStream underlyingOut,
+      InputStream underlyingIn, final DatanodeID datanodeId) throws IOException {
+    SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver();
+    if (saslPropsResolver == null) {
+      throw new IOException(String.format("Cannot create a secured " +
+        "connection if DataNode listens on unprivileged port (%d) and no " +
+        "protection is defined in configuration property %s.",
+        datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
+    }
+    Map<String, String> saslProps = saslPropsResolver.getServerProperties(
+      getPeerAddress(peer));
+
+    CallbackHandler callbackHandler = new SaslServerCallbackHandler(
+      new PasswordFunction() {
+        @Override
+        public char[] apply(String userName) throws IOException {
+          return buildServerPassword(userName);
+        }
+    });
+    return doSaslHandshake(underlyingOut, underlyingIn, saslProps,
+        callbackHandler);
+  }
+
+  /**
+   * Calculates the expected correct password on the server side for the
+   * general-purpose handshake.  The password consists of the block access
+   * token's password (known to the DataNode via its secret manager).  This
+   * expects that the client has supplied a user name consisting of its
+   * serialized block access token identifier.
+   *
+   * @param userName SASL user name containing serialized block access token
+   *   identifier
+   * @return expected correct SASL password
+   * @throws IOException for any error
+   */    
+  private char[] buildServerPassword(String userName) throws IOException {
+    BlockTokenIdentifier identifier = deserializeIdentifier(userName);
+    byte[] tokenPassword = blockPoolTokenSecretManager.retrievePassword(
+      identifier);
+    return (new String(Base64.encodeBase64(tokenPassword, false),
+      Charsets.UTF_8)).toCharArray();
+  }
+
+  /**
+   * Deserializes a base64-encoded binary representation of a block access
+   * token.
+   *
+   * @param str String to deserialize
+   * @return BlockTokenIdentifier deserialized from str
+   * @throws IOException if there is any I/O error
+   */
+  private BlockTokenIdentifier deserializeIdentifier(String str)
+      throws IOException {
+    BlockTokenIdentifier identifier = new BlockTokenIdentifier();
+    identifier.readFields(new DataInputStream(new ByteArrayInputStream(
+      Base64.decodeBase64(str))));
+    return identifier;
+  }
+
+  /**
+   * This method actually executes the server-side SASL handshake.
+   *
+   * @param underlyingOut connection output stream
+   * @param underlyingIn connection input stream
+   * @param saslProps properties of SASL negotiation
+   * @param callbackHandler for responding to SASL callbacks
+   * @return new pair of streams, wrapped after SASL negotiation
+   * @throws IOException for any error
+   */
+  private IOStreamPair doSaslHandshake(OutputStream underlyingOut,
+      InputStream underlyingIn, Map<String, String> saslProps,
+      CallbackHandler callbackHandler) throws IOException {
+
+    DataInputStream in = new DataInputStream(underlyingIn);
+    DataOutputStream out = new DataOutputStream(underlyingOut);
+
+    SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(saslProps,
+      callbackHandler);
+
+    int magicNumber = in.readInt();
+    if (magicNumber != SASL_TRANSFER_MAGIC_NUMBER) {
+      throw new InvalidMagicNumberException(magicNumber);
+    }
+    try {
+      // step 1
+      performSaslStep1(out, in, sasl);
+
+      // step 2 (server-side only)
+      byte[] remoteResponse = readSaslMessage(in);
+      byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
+      sendSaslMessage(out, localResponse);
+
+      // SASL handshake is complete
+      checkSaslComplete(sasl, saslProps);
+
+      return sasl.createStreamPair(out, in);
+    } catch (IOException ioe) {
+      if (ioe instanceof SaslException &&
+          ioe.getCause() != null &&
+          ioe.getCause() instanceof InvalidEncryptionKeyException) {
+        // This could just be because the client is long-lived and hasn't gotten
+        // a new encryption key from the NN in a while. Upon receiving this
+        // error, the client will get a new encryption key from the NN and retry
+        // connecting to this DN.
+        sendInvalidKeySaslErrorMessage(out, ioe.getCause().getMessage());
+      } else {
+        sendGenericSaslErrorMessage(out, ioe.getMessage());
+      }
+      throw ioe;
+    }
+  }
+
+  /**
+   * Sends a SASL negotiation message indicating an invalid key error.
+   *
+   * @param out stream to receive message
+   * @param message to send
+   * @throws IOException for any error
+   */
+  private static void sendInvalidKeySaslErrorMessage(DataOutputStream out,
+      String message) throws IOException {
+    sendSaslMessage(out, DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY, null,
+        message);
+  }
+}

+ 166 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java

@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.util.Map;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslOutputStream;
+
+/**
+ * Strongly inspired by Thrift's TSaslTransport class.
+ *
+ * Used to abstract over the <code>SaslServer</code> and
+ * <code>SaslClient</code> classes, which share a lot of their interface, but
+ * unfortunately don't share a common superclass.
+ */
+@InterfaceAudience.Private
+class SaslParticipant {
+
+  // This has to be set as part of the SASL spec, but it don't matter for
+  // our purposes, but may not be empty. It's sent over the wire, so use
+  // a short string.
+  private static final String SERVER_NAME = "0";
+  private static final String PROTOCOL = "hdfs";
+  private static final String MECHANISM = "DIGEST-MD5";
+
+  // One of these will always be null.
+  private final SaslServer saslServer;
+  private final SaslClient saslClient;
+
+  /**
+   * Creates a SaslParticipant wrapping a SaslServer.
+   *
+   * @param saslProps properties of SASL negotiation
+   * @param callbackHandler for handling all SASL callbacks
+   * @return SaslParticipant wrapping SaslServer
+   * @throws SaslException for any error
+   */
+  public static SaslParticipant createServerSaslParticipant(
+      Map<String, String> saslProps, CallbackHandler callbackHandler)
+      throws SaslException {
+    return new SaslParticipant(Sasl.createSaslServer(MECHANISM,
+      PROTOCOL, SERVER_NAME, saslProps, callbackHandler));
+  }
+
+  /**
+   * Creates a SaslParticipant wrapping a SaslClient.
+   *
+   * @param userName SASL user name
+   * @param saslProps properties of SASL negotiation
+   * @param callbackHandler for handling all SASL callbacks
+   * @return SaslParticipant wrapping SaslClient
+   * @throws SaslException for any error
+   */
+  public static SaslParticipant createClientSaslParticipant(String userName,
+      Map<String, String> saslProps, CallbackHandler callbackHandler)
+      throws SaslException {
+    return new SaslParticipant(Sasl.createSaslClient(new String[] { MECHANISM },
+      userName, PROTOCOL, SERVER_NAME, saslProps, callbackHandler));
+  }
+
+  /**
+   * Private constructor wrapping a SaslServer.
+   *
+   * @param saslServer to wrap
+   */
+  private SaslParticipant(SaslServer saslServer) {
+    this.saslServer = saslServer;
+    this.saslClient = null;
+  }
+
+  /**
+   * Private constructor wrapping a SaslClient.
+   *
+   * @param saslClient to wrap
+   */
+  private SaslParticipant(SaslClient saslClient) {
+    this.saslServer = null;
+    this.saslClient = saslClient;
+  }
+
+  /**
+   * @see {@link SaslServer#evaluateResponse}
+   * @see {@link SaslClient#evaluateChallenge}
+   */
+  public byte[] evaluateChallengeOrResponse(byte[] challengeOrResponse)
+      throws SaslException {
+    if (saslClient != null) {
+      return saslClient.evaluateChallenge(challengeOrResponse);
+    } else {
+      return saslServer.evaluateResponse(challengeOrResponse);
+    }
+  }
+
+  /**
+   * After successful SASL negotation, returns the negotiated quality of
+   * protection.
+   *
+   * @return negotiated quality of protection
+   */
+  public String getNegotiatedQop() {
+    if (saslClient != null) {
+      return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+    } else {
+      return (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+    }
+  }
+
+  /**
+   * Returns true if SASL negotiation is complete.
+   *
+   * @return true if SASL negotiation is complete
+   */
+  public boolean isComplete() {
+    if (saslClient != null) {
+      return saslClient.isComplete();
+    } else {
+      return saslServer.isComplete();
+    }
+  }
+
+  /**
+   * Return some input/output streams that may henceforth have their
+   * communication encrypted, depending on the negotiated quality of protection.
+   *
+   * @param out output stream to wrap
+   * @param in input stream to wrap
+   * @return IOStreamPair wrapping the streams
+   */
+  public IOStreamPair createStreamPair(DataOutputStream out,
+      DataInputStream in) {
+    if (saslClient != null) {
+      return new IOStreamPair(
+          new SaslInputStream(in, saslClient),
+          new SaslOutputStream(out, saslClient));
+    } else {
+      return new IOStreamPair(
+          new SaslInputStream(in, saslServer),
+          new SaslOutputStream(out, saslServer));
+    }
+  }
+}

+ 21 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
 import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
 import java.io.BufferedInputStream;
@@ -62,9 +64,11 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -202,6 +206,7 @@ public class Balancer {
   
   private final NameNodeConnector nnc;
   private final BalancingPolicy policy;
+  private final SaslDataTransferClient saslClient;
   private final double threshold;
   
   // all data node lists
@@ -352,19 +357,18 @@ public class Balancer {
         
         OutputStream unbufOut = sock.getOutputStream();
         InputStream unbufIn = sock.getInputStream();
-        if (nnc.getDataEncryptionKey() != null) {
-          IOStreamPair encryptedStreams =
-              DataTransferEncryptor.getEncryptedStreams(
-                  unbufOut, unbufIn, nnc.getDataEncryptionKey());
-          unbufOut = encryptedStreams.out;
-          unbufIn = encryptedStreams.in;
-        }
+        ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
+        Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
+        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+          unbufIn, nnc, accessToken, target.datanode);
+        unbufOut = saslStreams.out;
+        unbufIn = saslStreams.in;
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.IO_FILE_BUFFER_SIZE));
         in = new DataInputStream(new BufferedInputStream(unbufIn,
             HdfsConstants.IO_FILE_BUFFER_SIZE));
         
-        sendRequest(out);
+        sendRequest(out, eb, accessToken);
         receiveResponse(in);
         bytesMoved.addAndGet(block.getNumBytes());
         LOG.info("Successfully moved " + this);
@@ -395,9 +399,8 @@ public class Balancer {
     }
     
     /* Send a block replace request to the output stream*/
-    private void sendRequest(DataOutputStream out) throws IOException {
-      final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
-      final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
+    private void sendRequest(DataOutputStream out, ExtendedBlock eb,
+        Token<BlockTokenIdentifier> accessToken) throws IOException {
       new Sender(out).replaceBlock(eb, accessToken,
           source.getStorageID(), proxySource.getDatanode());
     }
@@ -876,6 +879,12 @@ public class Balancer {
     this.maxConcurrentMovesPerNode =
         conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+    this.saslClient = new SaslDataTransferClient(
+      DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+      TrustedChannelResolver.getInstance(conf),
+      conf.getBoolean(
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
   }
   
   /* Given a data node set, build a network topology and decide

+ 6 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java

@@ -34,8 +34,8 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -50,7 +50,7 @@ import org.apache.hadoop.util.Daemon;
  * The class provides utilities for {@link Balancer} to access a NameNode
  */
 @InterfaceAudience.Private
-class NameNodeConnector {
+class NameNodeConnector implements DataEncryptionKeyFactory {
   private static final Log LOG = Balancer.LOG;
   private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
   private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
@@ -72,7 +72,6 @@ class NameNodeConnector {
   private BlockTokenSecretManager blockTokenSecretManager;
   private Daemon keyupdaterthread; // AccessKeyUpdater thread
   private DataEncryptionKey encryptionKey;
-  private final TrustedChannelResolver trustedChannelResolver;
 
   NameNodeConnector(URI nameNodeUri,
       Configuration conf) throws IOException {
@@ -122,7 +121,6 @@ class NameNodeConnector {
     if (out == null) {
       throw new IOException("Another balancer is running");
     }
-    this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
   }
 
   boolean shouldContinue(long dispatchBlockMoveBytes) {
@@ -154,10 +152,10 @@ class NameNodeConnector {
           BlockTokenSecretManager.AccessMode.COPY));
     }
   }
-  
-  DataEncryptionKey getDataEncryptionKey()
-      throws IOException {
-    if (encryptDataTransfer && !this.trustedChannelResolver.isTrusted()) {
+
+  @Override
+  public DataEncryptionKey newDataEncryptionKey() {
+    if (encryptDataTransfer) {
       synchronized (this) {
         if (encryptionKey == null) {
           encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();

+ 45 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java

@@ -52,7 +52,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.security.SaslPropertiesResolver;
 
 /**
  * Simple class encapsulating all of the configuration that the DataNode
@@ -86,6 +88,7 @@ public class DNConf {
   
   final String minimumNameNodeVersion;
   final String encryptionAlgorithm;
+  final SaslPropertiesResolver saslPropsResolver;
   final TrustedChannelResolver trustedChannelResolver;
   
   final long xceiverStopTimeout;
@@ -168,6 +171,8 @@ public class DNConf {
         DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
     this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
     this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
+    this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
+      conf);
     
     this.xceiverStopTimeout = conf.getLong(
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
@@ -186,7 +191,26 @@ public class DNConf {
   String getMinimumNameNodeVersion() {
     return this.minimumNameNodeVersion;
   }
-  
+
+  /**
+   * Returns true if encryption enabled for DataTransferProtocol.
+   *
+   * @return boolean true if encryption enabled for DataTransferProtocol
+   */
+  public boolean getEncryptDataTransfer() {
+    return encryptDataTransfer;
+  }
+
+  /**
+   * Returns encryption algorithm configured for DataTransferProtocol, or null
+   * if not configured.
+   *
+   * @return encryption algorithm configured for DataTransferProtocol
+   */
+  public String getEncryptionAlgorithm() {
+    return encryptionAlgorithm;
+  }
+
   public long getXceiverStopTimeout() {
     return xceiverStopTimeout;
   }
@@ -194,4 +218,24 @@ public class DNConf {
   public long getMaxLockedMemory() {
     return maxLockedMemory;
   }
+
+  /**
+   * Returns the SaslPropertiesResolver configured for use with
+   * DataTransferProtocol, or null if not configured.
+   *
+   * @return SaslPropertiesResolver configured for use with DataTransferProtocol
+   */
+  public SaslPropertiesResolver getSaslPropsResolver() {
+    return saslPropsResolver;
+  }
+
+  /**
+   * Returns the TrustedChannelResolver configured for use with
+   * DataTransferProtocol, or null if not configured.
+   *
+   * @return TrustedChannelResolver configured for use with DataTransferProtocol
+   */
+  public TrustedChannelResolver getTrustedChannelResolver() {
+    return trustedChannelResolver;
+  }
 }

+ 94 - 27
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -40,6 +43,9 @@ import org.apache.hadoop.hdfs.net.DomainPeerServer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -224,6 +230,8 @@ public class DataNode extends Configured
   private final List<String> usersWithLocalPathAccess;
   private final boolean connectToDnViaHostname;
   ReadaheadPool readaheadPool;
+  SaslDataTransferClient saslClient;
+  SaslDataTransferServer saslServer;
   private final boolean getHdfsBlockLocationsEnabled;
   private ObjectName dataNodeInfoBeanName;
   private Thread checkDiskErrorThread = null;
@@ -722,15 +730,10 @@ public class DataNode extends Configured
    */
   void startDataNode(Configuration conf, 
                      List<StorageLocation> dataDirs,
-                    // DatanodeProtocol namenode,
                      SecureResources resources
                      ) throws IOException {
-    if(UserGroupInformation.isSecurityEnabled() && resources == null) {
-      if (!conf.getBoolean("ignore.secure.ports.for.testing", false)) {
-        throw new RuntimeException("Cannot start secure cluster without "
-            + "privileged resources.");
-      }
-    }
+
+    checkSecureConfig(conf, resources);
 
     // settings global for all BPs in the Data Node
     this.secureResources = resources;
@@ -790,6 +793,55 @@ public class DataNode extends Configured
     // Create the ReadaheadPool from the DataNode context so we can
     // exit without having to explicitly shutdown its thread pool.
     readaheadPool = ReadaheadPool.getInstance();
+    saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
+      dnConf.trustedChannelResolver,
+      conf.getBoolean(
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
+    saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
+  }
+
+  /**
+   * Checks if the DataNode has a secure configuration if security is enabled.
+   * There are 2 possible configurations that are considered secure:
+   * 1. The server has bound to privileged ports for RPC and HTTP via
+   *   SecureDataNodeStarter.
+   * 2. The configuration enables SASL on DataTransferProtocol and HTTPS (no
+   *   plain HTTP) for the HTTP server.  The SASL handshake guarantees
+   *   authentication of the RPC server before a client transmits a secret, such
+   *   as a block access token.  Similarly, SSL guarantees authentication of the
+   *   HTTP server before a client transmits a secret, such as a delegation
+   *   token.
+   * It is not possible to run with both privileged ports and SASL on
+   * DataTransferProtocol.  For backwards-compatibility, the connection logic
+   * must check if the target port is a privileged port, and if so, skip the
+   * SASL handshake.
+   *
+   * @param conf Configuration to check
+   * @param resources SecuredResources obtained for DataNode
+   * @throws RuntimeException if security enabled, but configuration is insecure
+   */
+  private static void checkSecureConfig(Configuration conf,
+      SecureResources resources) throws RuntimeException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
+    if (resources != null && dataTransferProtection == null) {
+      return;
+    }
+    if (conf.getBoolean("ignore.secure.ports.for.testing", false)) {
+      return;
+    }
+    if (dataTransferProtection != null &&
+        DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY &&
+        resources == null) {
+      return;
+    }
+    throw new RuntimeException("Cannot start secure DataNode without " +
+      "configuring either privileged resources or SASL RPC data transfer " +
+      "protection and SSL for HTTP.  Using privileged resources in " +
+      "combination with SASL RPC data transfer protection is not supported.");
   }
   
   public static String generateUuid() {
@@ -1623,20 +1675,25 @@ public class DataNode extends Configured
         NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
         sock.setSoTimeout(targets.length * dnConf.socketTimeout);
 
+        //
+        // Header info
+        //
+        Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+        if (isBlockTokenEnabled) {
+          accessToken = blockPoolTokenSecretManager.generateToken(b, 
+              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
+        }
+
         long writeTimeout = dnConf.socketWriteTimeout + 
                             HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
         OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
         InputStream unbufIn = NetUtils.getInputStream(sock);
-        if (dnConf.encryptDataTransfer && 
-            !dnConf.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
-          IOStreamPair encryptedStreams =
-              DataTransferEncryptor.getEncryptedStreams(
-                  unbufOut, unbufIn,
-                  blockPoolTokenSecretManager.generateDataEncryptionKey(
-                      b.getBlockPoolId()));
-          unbufOut = encryptedStreams.out;
-          unbufIn = encryptedStreams.in;
-        }
+        DataEncryptionKeyFactory keyFactory =
+          getDataEncryptionKeyFactoryForBlock(b);
+        IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+          unbufIn, keyFactory, accessToken, bpReg);
+        unbufOut = saslStreams.out;
+        unbufIn = saslStreams.in;
         
         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.SMALL_BUFFER_SIZE));
@@ -1645,15 +1702,6 @@ public class DataNode extends Configured
             false, false, true, DataNode.this, null, cachingStrategy);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
-        //
-        // Header info
-        //
-        Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
-        if (isBlockTokenEnabled) {
-          accessToken = blockPoolTokenSecretManager.generateToken(b, 
-              EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
-        }
-
         new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
             stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
 
@@ -1696,7 +1744,26 @@ public class DataNode extends Configured
       }
     }
   }
-  
+
+  /**
+   * Returns a new DataEncryptionKeyFactory that generates a key from the
+   * BlockPoolTokenSecretManager, using the block pool ID of the given block.
+   *
+   * @param block for which the factory needs to create a key
+   * @return DataEncryptionKeyFactory for block's block pool ID
+   */
+  DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
+      final ExtendedBlock block) {
+    return new DataEncryptionKeyFactory() {
+      @Override
+      public DataEncryptionKey newDataEncryptionKey() {
+        return dnConf.encryptDataTransfer ?
+          blockPoolTokenSecretManager.generateDataEncryptionKey(
+            block.getBlockPoolId()) : null;
+      }
+    };
+  }
+
   /**
    * After a block becomes finalized, a datanode increases metric counter,
    * notifies namenode, and adds it to the block scanner

+ 18 - 58
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -36,11 +36,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
-import java.net.UnknownHostException;
 import java.nio.channels.ClosedChannelException;
 import java.security.MessageDigest;
 import java.util.Arrays;
@@ -52,13 +50,12 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
@@ -85,7 +82,6 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
 import com.google.common.base.Preconditions;
-import com.google.common.net.InetAddresses;
 import com.google.protobuf.ByteString;
 
 
@@ -174,24 +170,11 @@ class DataXceiver extends Receiver implements Runnable {
       dataXceiverServer.addPeer(peer, Thread.currentThread());
       peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
       InputStream input = socketIn;
-      if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer &&
-          !dnConf.trustedChannelResolver.isTrusted(getClientAddress(peer))){
-        IOStreamPair encryptedStreams = null;
-        try {
-          encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
-              socketIn, datanode.blockPoolTokenSecretManager,
-              dnConf.encryptionAlgorithm);
-        } catch (InvalidMagicNumberException imne) {
-          LOG.info("Failed to read expected encryption handshake from client " +
-              "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
-              "is running an older version of Hadoop which does not support " +
-              "encryption");
-          return;
-        }
-        input = encryptedStreams.in;
-        socketOut = encryptedStreams.out;
-      }
-      input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE);
+      IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
+        socketIn, datanode.getDatanodeId());
+      input = new BufferedInputStream(saslStreams.in,
+        HdfsConstants.SMALL_BUFFER_SIZE);
+      socketOut = saslStreams.out;
       
       super.initialize(new DataInputStream(input));
       
@@ -263,19 +246,6 @@ class DataXceiver extends Receiver implements Runnable {
       }
     }
   }
-  
-  /**
-   * Returns InetAddress from peer
-   * The getRemoteAddressString is the form  /ip-address:port
-   * The ip-address is extracted from peer and InetAddress is formed
-   * @param peer
-   * @return
-   * @throws UnknownHostException
-   */
-  private static InetAddress getClientAddress(Peer peer) {
-    return InetAddresses.forString(
-        peer.getRemoteAddressString().split(":")[0].substring(1));
-  }
 
   @Override
   public void requestShortCircuitFds(final ExtendedBlock blk,
@@ -656,17 +626,12 @@ class DataXceiver extends Receiver implements Runnable {
           OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
               writeTimeout);
           InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
-          if (dnConf.encryptDataTransfer &&
-              !dnConf.trustedChannelResolver.isTrusted(mirrorSock.getInetAddress())) {
-            IOStreamPair encryptedStreams =
-                DataTransferEncryptor.getEncryptedStreams(
-                    unbufMirrorOut, unbufMirrorIn,
-                    datanode.blockPoolTokenSecretManager
-                        .generateDataEncryptionKey(block.getBlockPoolId()));
-            
-            unbufMirrorOut = encryptedStreams.out;
-            unbufMirrorIn = encryptedStreams.in;
-          }
+          DataEncryptionKeyFactory keyFactory =
+            datanode.getDataEncryptionKeyFactoryForBlock(block);
+          IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
+            unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
+          unbufMirrorOut = saslStreams.out;
+          unbufMirrorIn = saslStreams.in;
           mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
               HdfsConstants.SMALL_BUFFER_SIZE));
           mirrorIn = new DataInputStream(unbufMirrorIn);
@@ -1026,17 +991,12 @@ class DataXceiver extends Receiver implements Runnable {
       OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
           dnConf.socketWriteTimeout);
       InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
-      if (dnConf.encryptDataTransfer && 
-          !dnConf.trustedChannelResolver.isTrusted(
-              proxySock.getInetAddress())) {
-        IOStreamPair encryptedStreams =
-            DataTransferEncryptor.getEncryptedStreams(
-                unbufProxyOut, unbufProxyIn,
-                datanode.blockPoolTokenSecretManager
-                    .generateDataEncryptionKey(block.getBlockPoolId()));
-        unbufProxyOut = encryptedStreams.out;
-        unbufProxyIn = encryptedStreams.in;
-      }
+      DataEncryptionKeyFactory keyFactory =
+        datanode.getDataEncryptionKeyFactoryForBlock(block);
+      IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
+        unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
+      unbufProxyOut = saslStreams.out;
+      unbufProxyIn = saslStreams.in;
       
       proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, 
           HdfsConstants.SMALL_BUFFER_SIZE));

+ 29 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.RemotePeerFactory;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -55,6 +59,12 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
 import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
@@ -65,6 +75,7 @@ import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -92,7 +103,7 @@ import com.google.common.annotations.VisibleForTesting;
  *  factors of each file.
  */
 @InterfaceAudience.Private
-public class NamenodeFsck {
+public class NamenodeFsck implements DataEncryptionKeyFactory {
   public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
   
   // return string marking fsck status
@@ -149,6 +160,7 @@ public class NamenodeFsck {
   private List<String> snapshottableDirs = null;
 
   private final BlockPlacementPolicy bpPolicy;
+  private final SaslDataTransferClient saslClient;
 
   /**
    * Filesystem checker.
@@ -175,6 +187,12 @@ public class NamenodeFsck {
         networktopology,
         namenode.getNamesystem().getBlockManager().getDatanodeManager()
         .getHost2DatanodeMap());
+    this.saslClient = new SaslDataTransferClient(
+      DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+      TrustedChannelResolver.getInstance(conf),
+      conf.getBoolean(
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
     
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
@@ -616,15 +634,16 @@ public class NamenodeFsck {
             setConfiguration(namenode.conf).
             setRemotePeerFactory(new RemotePeerFactory() {
               @Override
-              public Peer newConnectedPeer(InetSocketAddress addr)
+              public Peer newConnectedPeer(InetSocketAddress addr,
+                  Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
                   throws IOException {
                 Peer peer = null;
                 Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
                 try {
                   s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
                   s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-                  peer = TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
-                        getDataEncryptionKey());
+                  peer = TcpPeerServer.peerFromSocketAndKey(saslClient, s,
+                        NamenodeFsck.this, blockToken, datanodeId);
                 } finally {
                   if (peer == null) {
                     IOUtils.closeQuietly(s);
@@ -663,7 +682,12 @@ public class NamenodeFsck {
       throw new Exception("Could not copy block data for " + lblock.getBlock());
     }
   }
-      
+
+  @Override
+  public DataEncryptionKey newDataEncryptionKey() throws IOException {
+    return namenode.getRpcServer().getDataEncryptionKey();
+  }
+
   /*
    * XXX (ab) See comment above for copyBlock().
    *

+ 31 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -1451,6 +1451,37 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.data.transfer.protection</name>
+  <value></value>
+  <description>
+    A comma-separated list of SASL protection values used for secured
+    connections to the DataNode when reading or writing block data.  Possible
+    values are authentication, integrity and privacy.  authentication means
+    authentication only and no integrity or privacy; integrity implies
+    authentication and integrity are enabled; and privacy implies all of
+    authentication, integrity and privacy are enabled.  If
+    dfs.encrypt.data.transfer is set to true, then it supersedes the setting for
+    dfs.data.transfer.protection and enforces that all connections must use a
+    specialized encrypted SASL handshake.  This property is ignored for
+    connections to a DataNode listening on a privileged port.  In this case, it
+    is assumed that the use of a privileged port establishes sufficient trust.
+  </description>
+</property>
+
+<property>
+  <name>dfs.data.transfer.saslproperties.resolver.class</name>
+  <value></value>
+  <description>
+    SaslPropertiesResolver used to resolve the QOP used for a connection to the
+    DataNode when reading or writing block data.  If not specified, the full set
+    of values specified in dfs.data.transfer.protection is used while
+    determining the QOP used for the connection. If a class is specified, then
+    the QOP values returned by the class will be used while determining the QOP
+    used for the connection.
+  </description>
+</property>
+
 <property>
   <name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
   <value>false</value>

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java

@@ -33,9 +33,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 
@@ -192,7 +195,8 @@ public class BlockReaderTestUtil {
       setAllowShortCircuitLocalReads(true).
       setRemotePeerFactory(new RemotePeerFactory() {
         @Override
-        public Peer newConnectedPeer(InetSocketAddress addr)
+        public Peer newConnectedPeer(InetSocketAddress addr,
+            Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
             throws IOException {
           Peer peer = null;
           Socket sock = NetUtils.
@@ -251,4 +255,4 @@ public class BlockReaderTestUtil {
     LogManager.getLogger(DataNode.class.getName()).setLevel(
         Level.TRACE);
   }
-}
+}

+ 33 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -19,12 +19,15 @@ package org.apache.hadoop.hdfs;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
@@ -1308,15 +1311,42 @@ public class MiniDFSCluster {
       }
 
       SecureResources secureResources = null;
-      if (UserGroupInformation.isSecurityEnabled()) {
+      if (UserGroupInformation.isSecurityEnabled() &&
+          conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY) == null) {
         try {
           secureResources = SecureDataNodeStarter.getSecureResources(dnConf);
         } catch (Exception ex) {
           ex.printStackTrace();
         }
       }
-      DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf,
-                                                 secureResources);
+      final int maxRetriesOnSasl = conf.getInt(
+        IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY,
+        IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT);
+      int numRetries = 0;
+      DataNode dn = null;
+      while (true) {
+        try {
+          dn = DataNode.instantiateDataNode(dnArgs, dnConf,
+                                            secureResources);
+          break;
+        } catch (IOException e) {
+          // Work around issue testing security where rapidly starting multiple
+          // DataNodes using the same principal gets rejected by the KDC as a
+          // replay attack.
+          if (UserGroupInformation.isSecurityEnabled() &&
+              numRetries < maxRetriesOnSasl) {
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException ie) {
+              Thread.currentThread().interrupt();
+              break;
+            }
+            ++numRetries;
+            continue;
+          }
+          throw e;
+        }
+      }
       if(dn == null)
         throw new IOException("Cannot start DataNode in "
             + dnConf.get(DFS_DATANODE_DATA_DIR_KEY));

+ 110 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferTestCase.java

@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public abstract class SaslDataTransferTestCase {
+
+  private static File baseDir;
+  private static String hdfsPrincipal;
+  private static MiniKdc kdc;
+  private static String keytab;
+  private static String spnegoPrincipal;
+
+  @BeforeClass
+  public static void initKdc() throws Exception {
+    baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"),
+      SaslDataTransferTestCase.class.getSimpleName());
+    FileUtil.fullyDelete(baseDir);
+    assertTrue(baseDir.mkdirs());
+
+    Properties kdcConf = MiniKdc.createConf();
+    kdc = new MiniKdc(kdcConf, baseDir);
+    kdc.start();
+
+    String userName = UserGroupInformation.getLoginUser().getShortUserName();
+    File keytabFile = new File(baseDir, userName + ".keytab");
+    keytab = keytabFile.getAbsolutePath();
+    kdc.createPrincipal(keytabFile, userName + "/localhost", "HTTP/localhost");
+    hdfsPrincipal = userName + "/localhost@" + kdc.getRealm();
+    spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();
+  }
+
+  @AfterClass
+  public static void shutdownKdc() {
+    if (kdc != null) {
+      kdc.stop();
+    }
+    FileUtil.fullyDelete(baseDir);
+  }
+
+  /**
+   * Creates configuration for starting a secure cluster.
+   *
+   * @param dataTransferProtection supported QOPs
+   * @return configuration for starting a secure cluster
+   * @throws Exception if there is any failure
+   */
+  protected HdfsConfiguration createSecureConfig(
+      String dataTransferProtection) throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
+    conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
+    conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
+    conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);
+    conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
+    conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);
+    conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection);
+    conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+    conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+    conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
+
+    String keystoresDir = baseDir.getAbsolutePath();
+    String sslConfDir = KeyStoreTestUtil.getClasspathDir(this.getClass());
+    KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);
+    return conf;
+  }
+}

+ 155 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java

@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestSaslDataTransfer extends SaslDataTransferTestCase {
+
+  private static final int BLOCK_SIZE = 4096;
+  private static final int BUFFER_SIZE= 1024;
+  private static final int NUM_BLOCKS = 3;
+  private static final Path PATH  = new Path("/file1");
+  private static final short REPLICATION = 3;
+
+  private MiniDFSCluster cluster;
+  private FileSystem fs;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @After
+  public void shutdown() {
+    IOUtils.cleanup(null, fs);
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testAuthentication() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig(
+      "authentication,integrity,privacy");
+    startCluster(clusterConf);
+    HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+    clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
+    doTest(clientConf);
+  }
+
+  @Test
+  public void testIntegrity() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig(
+      "authentication,integrity,privacy");
+    startCluster(clusterConf);
+    HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+    clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "integrity");
+    doTest(clientConf);
+  }
+
+  @Test
+  public void testPrivacy() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig(
+      "authentication,integrity,privacy");
+    startCluster(clusterConf);
+    HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+    clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "privacy");
+    doTest(clientConf);
+  }
+
+  @Test
+  public void testClientAndServerDoNotHaveCommonQop() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig("privacy");
+    startCluster(clusterConf);
+    HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+    clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
+    exception.expect(IOException.class);
+    exception.expectMessage("could only be replicated to 0 nodes");
+    doTest(clientConf);
+  }
+
+  @Test
+  public void testClientSaslNoServerSasl() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig("");
+    startCluster(clusterConf);
+    HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+    clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
+    exception.expect(IOException.class);
+    exception.expectMessage("could only be replicated to 0 nodes");
+    doTest(clientConf);
+  }
+
+  @Test
+  public void testServerSaslNoClientSasl() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig(
+      "authentication,integrity,privacy");
+    startCluster(clusterConf);
+    HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+    clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "");
+    exception.expect(IOException.class);
+    exception.expectMessage("could only be replicated to 0 nodes");
+    doTest(clientConf);
+  }
+
+  /**
+   * Tests DataTransferProtocol with the given client configuration.
+   *
+   * @param conf client configuration
+   * @throws IOException if there is an I/O error
+   */
+  private void doTest(HdfsConfiguration conf) throws IOException {
+    fs = FileSystem.get(cluster.getURI(), conf);
+    FileSystemTestHelper.createFile(fs, PATH, NUM_BLOCKS, BLOCK_SIZE);
+    assertArrayEquals(FileSystemTestHelper.getFileData(NUM_BLOCKS, BLOCK_SIZE),
+      DFSTestUtil.readFile(fs, PATH).getBytes("UTF-8"));
+    BlockLocation[] blockLocations = fs.getFileBlockLocations(PATH, 0,
+      Long.MAX_VALUE);
+    assertNotNull(blockLocations);
+    assertEquals(NUM_BLOCKS, blockLocations.length);
+    for (BlockLocation blockLocation: blockLocations) {
+      assertNotNull(blockLocation.getHosts());
+      assertEquals(3, blockLocation.getHosts().length);
+    }
+  }
+
+  /**
+   * Starts a cluster with the given configuration.
+   *
+   * @param conf cluster configuration
+   * @throws IOException if there is an I/O error
+   */
+  private void startCluster(HdfsConfiguration conf) throws IOException {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+  }
+}

+ 41 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithSaslDataTransfer.java

@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
+import org.junit.Test;
+
+public class TestBalancerWithSaslDataTransfer extends SaslDataTransferTestCase {
+
+  private static final TestBalancer TEST_BALANCER = new TestBalancer();
+
+  @Test
+  public void testBalancer0Authentication() throws Exception {
+    TEST_BALANCER.testBalancer0Internal(createSecureConfig("authentication"));
+  }
+
+  @Test
+  public void testBalancer0Integrity() throws Exception {
+    TEST_BALANCER.testBalancer0Internal(createSecureConfig("integrity"));
+  }
+
+  @Test
+  public void testBalancer0Privacy() throws Exception {
+    TEST_BALANCER.testBalancer0Internal(createSecureConfig("privacy"));
+  }
+}

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.RemotePeerFactory;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -160,7 +161,8 @@ public class TestBlockTokenWithDFS {
           setConfiguration(conf).
           setRemotePeerFactory(new RemotePeerFactory() {
             @Override
-            public Peer newConnectedPeer(InetSocketAddress addr)
+            public Peer newConnectedPeer(InetSocketAddress addr,
+                Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
                 throws IOException {
               Peer peer = null;
               Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -46,9 +46,11 @@ import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -58,6 +60,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -307,7 +310,8 @@ public class TestDataNodeVolumeFailure {
       setConfiguration(conf).
       setRemotePeerFactory(new RemotePeerFactory() {
         @Override
-        public Peer newConnectedPeer(InetSocketAddress addr)
+        public Peer newConnectedPeer(InetSocketAddress addr,
+            Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
             throws IOException {
           Peer peer = null;
           Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();