Browse Source

HDFS-7073. Allow falling back to a non-SASL connection on DataTransferProtocol in several edge cases. Contributed by Chris Nauroth.

cnauroth 10 năm trước cách đây
mục cha
commit
f85cc14eb4
22 tập tin đã thay đổi với 382 bổ sung111 xóa
  1. 59 9
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
  2. 18 4
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
  3. 36 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
  4. 9 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
  5. 23 4
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
  6. 11 1
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
  7. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  8. 15 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  9. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  10. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
  11. 64 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
  12. 32 16
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
  13. 19 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
  14. 1 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
  15. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
  16. 17 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
  17. 13 13
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  18. 4 16
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  19. 14 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
  20. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
  21. 34 11
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java
  22. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java

+ 59 - 9
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -687,7 +687,8 @@ public class Client {
      * a header to the server and starts
      * the connection thread that waits for responses.
      */
-    private synchronized void setupIOstreams() {
+    private synchronized void setupIOstreams(
+        AtomicBoolean fallbackToSimpleAuth) {
       if (socket != null || shouldCloseConnection.get()) {
         return;
       } 
@@ -738,11 +739,18 @@ public class Client {
               remoteId.saslQop =
                   (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
               LOG.debug("Negotiated QOP is :" + remoteId.saslQop);
-            } else if (UserGroupInformation.isSecurityEnabled() &&
-                       !fallbackAllowed) {
-              throw new IOException("Server asks us to fall back to SIMPLE " +
-                  "auth, but this client is configured to only allow secure " +
-                  "connections.");
+              if (fallbackToSimpleAuth != null) {
+                fallbackToSimpleAuth.set(false);
+              }
+            } else if (UserGroupInformation.isSecurityEnabled()) {
+              if (!fallbackAllowed) {
+                throw new IOException("Server asks us to fall back to SIMPLE " +
+                    "auth, but this client is configured to only allow secure " +
+                    "connections.");
+              }
+              if (fallbackToSimpleAuth != null) {
+                fallbackToSimpleAuth.set(true);
+              }
             }
           }
         
@@ -1375,6 +1383,26 @@ public class Client {
   /** 
    * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
    * <code>remoteId</code>, returning the rpc respond.
+   *
+   * @param rpcKind
+   * @param rpcRequest -  contains serialized method and method parameters
+   * @param remoteId - the target rpc server
+   * @param fallbackToSimpleAuth - set to true or false during this method to
+   *   indicate if a secure client falls back to simple auth
+   * @returns the rpc response
+   * Throws exceptions if there are network problems or if the remote code
+   * threw an exception.
+   */
+  public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+      ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)
+      throws IOException {
+    return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,
+      fallbackToSimpleAuth);
+  }
+
+  /**
+   * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
+   * <code>remoteId</code>, returning the rpc response.
    * 
    * @param rpcKind
    * @param rpcRequest -  contains serialized method and method parameters
@@ -1386,8 +1414,29 @@ public class Client {
    */
   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
       ConnectionId remoteId, int serviceClass) throws IOException {
+    return call(rpcKind, rpcRequest, remoteId, serviceClass, null);
+  }
+
+  /**
+   * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
+   * <code>remoteId</code>, returning the rpc response.
+   *
+   * @param rpcKind
+   * @param rpcRequest -  contains serialized method and method parameters
+   * @param remoteId - the target rpc server
+   * @param serviceClass - service class for RPC
+   * @param fallbackToSimpleAuth - set to true or false during this method to
+   *   indicate if a secure client falls back to simple auth
+   * @returns the rpc response
+   * Throws exceptions if there are network problems or if the remote code
+   * threw an exception.
+   */
+  public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+      ConnectionId remoteId, int serviceClass,
+      AtomicBoolean fallbackToSimpleAuth) throws IOException {
     final Call call = createCall(rpcKind, rpcRequest);
-    Connection connection = getConnection(remoteId, call, serviceClass);
+    Connection connection = getConnection(remoteId, call, serviceClass,
+      fallbackToSimpleAuth);
     try {
       connection.sendRpcRequest(call);                 // send the rpc request
     } catch (RejectedExecutionException e) {
@@ -1444,7 +1493,8 @@ public class Client {
   /** Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given ConnectionId are reused. */
   private Connection getConnection(ConnectionId remoteId,
-      Call call, int serviceClass) throws IOException {
+      Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
+      throws IOException {
     if (!running.get()) {
       // the client is stopped
       throw new IOException("The client is stopped");
@@ -1468,7 +1518,7 @@ public class Client {
     //block above. The reason for that is if the server happens to be slow,
     //it will take longer to establish a connection and that will slow the
     //entire system down.
-    connection.setupIOstreams();
+    connection.setupIOstreams(fallbackToSimpleAuth);
     return connection;
   }
   

+ 18 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

@@ -27,6 +27,7 @@ import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
 
@@ -84,14 +85,23 @@ public class ProtobufRpcEngine implements RpcEngine {
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
       InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
       SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
       ) throws IOException {
+    return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+      rpcTimeout, connectionRetryPolicy, null);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
+      AtomicBoolean fallbackToSimpleAuth) throws IOException {
 
     final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
-        rpcTimeout, connectionRetryPolicy);
+        rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
     return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
         protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
   }
@@ -115,13 +125,16 @@ public class ProtobufRpcEngine implements RpcEngine {
     private final Client client;
     private final long clientProtocolVersion;
     private final String protocolName;
+    private AtomicBoolean fallbackToSimpleAuth;
 
     private Invoker(Class<?> protocol, InetSocketAddress addr,
         UserGroupInformation ticket, Configuration conf, SocketFactory factory,
-        int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
+        int rpcTimeout, RetryPolicy connectionRetryPolicy,
+        AtomicBoolean fallbackToSimpleAuth) throws IOException {
       this(protocol, Client.ConnectionId.getConnectionId(
           addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
           conf, factory);
+      this.fallbackToSimpleAuth = fallbackToSimpleAuth;
     }
     
     /**
@@ -217,7 +230,8 @@ public class ProtobufRpcEngine implements RpcEngine {
       final RpcResponseWrapper val;
       try {
         val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
-            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId);
+            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
+            fallbackToSimpleAuth);
 
       } catch (Throwable e) {
         if (LOG.isTraceEnabled()) {

+ 36 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

@@ -33,6 +33,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
 
@@ -524,6 +525,7 @@ public class RPC {
    * @param conf configuration
    * @param factory socket factory
    * @param rpcTimeout max time for each rpc; 0 means no timeout
+   * @param connectionRetryPolicy retry policy
    * @return the proxy
    * @throws IOException if any error occurs
    */
@@ -535,11 +537,43 @@ public class RPC {
                                 SocketFactory factory,
                                 int rpcTimeout,
                                 RetryPolicy connectionRetryPolicy) throws IOException {    
+     return getProtocolProxy(protocol, clientVersion, addr, ticket,
+       conf, factory, rpcTimeout, connectionRetryPolicy, null);
+   }
+
+  /**
+   * Get a protocol proxy that contains a proxy connection to a remote server
+   * and a set of methods that are supported by the server
+   *
+   * @param protocol protocol
+   * @param clientVersion client's version
+   * @param addr server address
+   * @param ticket security ticket
+   * @param conf configuration
+   * @param factory socket factory
+   * @param rpcTimeout max time for each rpc; 0 means no timeout
+   * @param connectionRetryPolicy retry policy
+   * @param fallbackToSimpleAuth set to true or false during calls to indicate if
+   *   a secure client falls back to simple auth
+   * @return the proxy
+   * @throws IOException if any error occurs
+   */
+   public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
+                                long clientVersion,
+                                InetSocketAddress addr,
+                                UserGroupInformation ticket,
+                                Configuration conf,
+                                SocketFactory factory,
+                                int rpcTimeout,
+                                RetryPolicy connectionRetryPolicy,
+                                AtomicBoolean fallbackToSimpleAuth)
+       throws IOException {
     if (UserGroupInformation.isSecurityEnabled()) {
       SaslRpcServer.init(conf);
     }
-    return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion,
-        addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);
+    return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
+        addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
+        fallbackToSimpleAuth);
   }
 
    /**

+ 9 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
 
@@ -43,6 +44,14 @@ public interface RpcEngine {
                   SocketFactory factory, int rpcTimeout,
                   RetryPolicy connectionRetryPolicy) throws IOException;
 
+  /** Construct a client-side proxy object. */
+  <T> ProtocolProxy<T> getProxy(Class<T> protocol,
+                  long clientVersion, InetSocketAddress addr,
+                  UserGroupInformation ticket, Configuration conf,
+                  SocketFactory factory, int rpcTimeout,
+                  RetryPolicy connectionRetryPolicy,
+                  AtomicBoolean fallbackToSimpleAuth) throws IOException;
+
   /** 
    * Construct a server for a protocol implementation instance.
    * 

+ 23 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java

@@ -24,6 +24,7 @@ import java.lang.reflect.InvocationTargetException;
 
 import java.net.InetSocketAddress;
 import java.io.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
 
@@ -212,14 +213,17 @@ public class WritableRpcEngine implements RpcEngine {
     private Client.ConnectionId remoteId;
     private Client client;
     private boolean isClosed = false;
+    private final AtomicBoolean fallbackToSimpleAuth;
 
     public Invoker(Class<?> protocol,
                    InetSocketAddress address, UserGroupInformation ticket,
                    Configuration conf, SocketFactory factory,
-                   int rpcTimeout) throws IOException {
+                   int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
+        throws IOException {
       this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
           ticket, rpcTimeout, conf);
       this.client = CLIENTS.getClient(conf, factory);
+      this.fallbackToSimpleAuth = fallbackToSimpleAuth;
     }
 
     @Override
@@ -238,7 +242,8 @@ public class WritableRpcEngine implements RpcEngine {
       ObjectWritable value;
       try {
         value = (ObjectWritable)
-          client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), remoteId);
+          client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
+            remoteId, fallbackToSimpleAuth);
       } finally {
         if (traceScope != null) traceScope.close();
       }
@@ -275,11 +280,25 @@ public class WritableRpcEngine implements RpcEngine {
    * talking to a server at the named address. 
    * @param <T>*/
   @Override
-  @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
                          InetSocketAddress addr, UserGroupInformation ticket,
                          Configuration conf, SocketFactory factory,
                          int rpcTimeout, RetryPolicy connectionRetryPolicy)
+    throws IOException {
+    return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+      rpcTimeout, connectionRetryPolicy, null);
+  }
+
+  /** Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address. 
+   * @param <T>*/
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+                         InetSocketAddress addr, UserGroupInformation ticket,
+                         Configuration conf, SocketFactory factory,
+                         int rpcTimeout, RetryPolicy connectionRetryPolicy,
+                         AtomicBoolean fallbackToSimpleAuth)
     throws IOException {    
 
     if (connectionRetryPolicy != null) {
@@ -289,7 +308,7 @@ public class WritableRpcEngine implements RpcEngine {
 
     T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
         new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
-            factory, rpcTimeout));
+            factory, rpcTimeout, fallbackToSimpleAuth));
     return new ProtocolProxy<T>(protocol, proxy, true);
   }
   

+ 11 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -276,12 +276,22 @@ public class TestRPC {
    */
   private static class StoppedRpcEngine implements RpcEngine {
 
-    @SuppressWarnings("unchecked")
     @Override
     public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
         InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
         SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
         ) throws IOException {
+      return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
+        rpcTimeout, connectionRetryPolicy, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
+        InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+        SocketFactory factory, int rpcTimeout,
+        RetryPolicy connectionRetryPolicy, AtomicBoolean fallbackToSimpleAuth
+        ) throws IOException {
       T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
               new Class[] { protocol }, new StoppedInvocationHandler());
       return new ProtocolProxy<T>(protocol, proxy, false);

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -900,6 +900,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-7105. Fix TestJournalNode#testFailToStartWithBadConfig to match log
     output change. (Ray Chiang via cnauroth)
 
+    HDFS-7105. Allow falling back to a non-SASL connection on
+    DataTransferProtocol in several edge cases. (cnauroth)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

+ 15 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -22,8 +22,6 @@ import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension
     .EncryptedKeyVersion;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CIPHER_SUITE_KEY;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
@@ -90,6 +88,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -616,13 +615,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
         DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
         DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
     NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
+    AtomicBoolean nnFallbackToSimpleAuth = new AtomicBoolean(false);
     if (numResponseToDrop > 0) {
       // This case is used for testing.
       LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
           + " is set to " + numResponseToDrop
           + ", this hacked client will proactively drop responses");
       proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
-          nameNodeUri, ClientProtocol.class, numResponseToDrop);
+          nameNodeUri, ClientProtocol.class, numResponseToDrop,
+          nnFallbackToSimpleAuth);
     }
     
     if (proxyInfo != null) {
@@ -637,7 +638,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
       Preconditions.checkArgument(nameNodeUri != null,
           "null URI");
       proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
-          ClientProtocol.class);
+          ClientProtocol.class, nnFallbackToSimpleAuth);
       this.dtService = proxyInfo.getDelegationTokenService();
       this.namenode = proxyInfo.getProxy();
     }
@@ -675,10 +676,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
     this.saslClient = new SaslDataTransferClient(
       DataTransferSaslUtil.getSaslPropertiesResolver(conf),
-      TrustedChannelResolver.getInstance(conf),
-      conf.getBoolean(
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
+      TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
   }
   
   /**
@@ -3113,4 +3111,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   public void setKeyProvider(KeyProviderCryptoExtension provider) {
     this.provider = provider;
   }
+
+  /**
+   * Returns the SaslDataTransferClient configured for this DFSClient.
+   *
+   * @return SaslDataTransferClient configured for this DFSClient
+   */
+  public SaslDataTransferClient getSaslDataTransferClient() {
+    return saslClient;
+  }
 }

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -589,6 +589,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
   public static final String DFS_TRUSTEDCHANNEL_RESOLVER_CLASS = "dfs.trustedchannel.resolver.class";
   public static final String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
+  public static final String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = "";
   public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class";
   public static final int    DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
   public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
@@ -703,4 +704,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT =
       1000;
 
+  public static final String IGNORE_SECURE_PORTS_FOR_TESTING_KEY =
+      "ignore.secure.ports.for.testing";
+  public static final boolean IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT = false;
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java

@@ -244,7 +244,7 @@ public class HAUtil {
     // Create the proxy provider. Actual proxy is not created.
     AbstractNNFailoverProxyProvider<ClientProtocol> provider = NameNodeProxies
         .createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class,
-        false);
+        false, null);
 
     // No need to use logical URI since failover is not configured.
     if (provider == null) {

+ 64 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java

@@ -36,6 +36,7 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -145,13 +146,37 @@ public class NameNodeProxies {
   @SuppressWarnings("unchecked")
   public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
       URI nameNodeUri, Class<T> xface) throws IOException {
+    return createProxy(conf, nameNodeUri, xface, null);
+  }
+
+  /**
+   * Creates the namenode proxy with the passed protocol. This will handle
+   * creation of either HA- or non-HA-enabled proxy objects, depending upon
+   * if the provided URI is a configured logical URI.
+   *
+   * @param conf the configuration containing the required IPC
+   *        properties, client failover configurations, etc.
+   * @param nameNodeUri the URI pointing either to a specific NameNode
+   *        or to a logical nameservice.
+   * @param xface the IPC interface which should be created
+   * @param fallbackToSimpleAuth set to true or false during calls to indicate if
+   *   a secure client falls back to simple auth
+   * @return an object containing both the proxy and the associated
+   *         delegation token service it corresponds to
+   * @throws IOException if there is an error creating the proxy
+   **/
+  @SuppressWarnings("unchecked")
+  public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
+      URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
+      throws IOException {
     AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
-        createFailoverProxyProvider(conf, nameNodeUri, xface, true);
+        createFailoverProxyProvider(conf, nameNodeUri, xface, true,
+          fallbackToSimpleAuth);
   
     if (failoverProxyProvider == null) {
       // Non-HA case
       return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
-          UserGroupInformation.getCurrentUser(), true);
+          UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
     } else {
       // HA case
       Conf config = new Conf(conf);
@@ -187,6 +212,8 @@ public class NameNodeProxies {
    *        or to a logical nameservice.
    * @param xface the IPC interface which should be created
    * @param numResponseToDrop The number of responses to drop for each RPC call
+   * @param fallbackToSimpleAuth set to true or false during calls to indicate if
+   *   a secure client falls back to simple auth
    * @return an object containing both the proxy and the associated
    *         delegation token service it corresponds to. Will return null of the
    *         given configuration does not support HA.
@@ -195,10 +222,12 @@ public class NameNodeProxies {
   @SuppressWarnings("unchecked")
   public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
       Configuration config, URI nameNodeUri, Class<T> xface,
-      int numResponseToDrop) throws IOException {
+      int numResponseToDrop, AtomicBoolean fallbackToSimpleAuth)
+      throws IOException {
     Preconditions.checkArgument(numResponseToDrop > 0);
     AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
-        createFailoverProxyProvider(config, nameNodeUri, xface, true);
+        createFailoverProxyProvider(config, nameNodeUri, xface, true,
+          fallbackToSimpleAuth);
 
     if (failoverProxyProvider != null) { // HA case
       int delay = config.getInt(
@@ -257,12 +286,35 @@ public class NameNodeProxies {
   public static <T> ProxyAndInfo<T> createNonHAProxy(
       Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
       UserGroupInformation ugi, boolean withRetries) throws IOException {
+    return createNonHAProxy(conf, nnAddr, xface, ugi, withRetries, null);
+  }
+
+  /**
+   * Creates an explicitly non-HA-enabled proxy object. Most of the time you
+   * don't want to use this, and should instead use {@link NameNodeProxies#createProxy}.
+   *
+   * @param conf the configuration object
+   * @param nnAddr address of the remote NN to connect to
+   * @param xface the IPC interface which should be created
+   * @param ugi the user who is making the calls on the proxy object
+   * @param withRetries certain interfaces have a non-standard retry policy
+   * @param fallbackToSimpleAuth - set to true or false during this method to
+   *   indicate if a secure client falls back to simple auth
+   * @return an object containing both the proxy and the associated
+   *         delegation token service it corresponds to
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> ProxyAndInfo<T> createNonHAProxy(
+      Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
+      UserGroupInformation ugi, boolean withRetries,
+      AtomicBoolean fallbackToSimpleAuth) throws IOException {
     Text dtService = SecurityUtil.buildTokenService(nnAddr);
   
     T proxy;
     if (xface == ClientProtocol.class) {
       proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,
-          withRetries);
+          withRetries, fallbackToSimpleAuth);
     } else if (xface == JournalProtocol.class) {
       proxy = (T) createNNProxyWithJournalProtocol(nnAddr, conf, ugi);
     } else if (xface == NamenodeProtocol.class) {
@@ -351,7 +403,8 @@ public class NameNodeProxies {
   
   private static ClientProtocol createNNProxyWithClientProtocol(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
-      boolean withRetries) throws IOException {
+      boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
+      throws IOException {
     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
 
     final RetryPolicy defaultPolicy = 
@@ -367,8 +420,8 @@ public class NameNodeProxies {
     ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
         ClientNamenodeProtocolPB.class, version, address, ugi, conf,
         NetUtils.getDefaultSocketFactory(conf),
-        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy)
-            .getProxy();
+        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
+        fallbackToSimpleAuth).getProxy();
 
     if (withRetries) { // create the proxy with retries
 
@@ -440,8 +493,8 @@ public class NameNodeProxies {
   /** Creates the Failover proxy provider instance*/
   @VisibleForTesting
   public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider(
-      Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort)
-      throws IOException {
+      Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort,
+      AtomicBoolean fallbackToSimpleAuth) throws IOException {
     Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null;
     AbstractNNFailoverProxyProvider<T> providerNN;
     Preconditions.checkArgument(
@@ -490,6 +543,7 @@ public class NameNodeProxies {
             + " and does not use port information.");
       }
     }
+    providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth);
     return providerNN;
   }
 

+ 32 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java

@@ -28,6 +28,7 @@ import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
@@ -71,21 +72,38 @@ public class SaslDataTransferClient {
   private static final Logger LOG = LoggerFactory.getLogger(
     SaslDataTransferClient.class);
 
-  private final boolean fallbackToSimpleAuthAllowed;
+  private final AtomicBoolean fallbackToSimpleAuth;
   private final SaslPropertiesResolver saslPropsResolver;
   private final TrustedChannelResolver trustedChannelResolver;
 
+  /**
+   * Creates a new SaslDataTransferClient.  This constructor is used in cases
+   * where it is not relevant to track if a secure client did a fallback to
+   * simple auth.  For intra-cluster connections between data nodes in the same
+   * cluster, we can assume that all run under the same security configuration.
+   *
+   * @param saslPropsResolver for determining properties of SASL negotiation
+   * @param trustedChannelResolver for identifying trusted connections that do
+   *   not require SASL negotiation
+   */
+  public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
+      TrustedChannelResolver trustedChannelResolver) {
+    this(saslPropsResolver, trustedChannelResolver, null);
+  }
+
   /**
    * Creates a new SaslDataTransferClient.
    *
    * @param saslPropsResolver for determining properties of SASL negotiation
    * @param trustedChannelResolver for identifying trusted connections that do
    *   not require SASL negotiation
+   * @param fallbackToSimpleAuth checked on each attempt at general SASL
+   *   handshake, if true forces use of simple auth
    */
   public SaslDataTransferClient(SaslPropertiesResolver saslPropsResolver,
       TrustedChannelResolver trustedChannelResolver,
-      boolean fallbackToSimpleAuthAllowed) {
-    this.fallbackToSimpleAuthAllowed = fallbackToSimpleAuthAllowed;
+      AtomicBoolean fallbackToSimpleAuth) {
+    this.fallbackToSimpleAuth = fallbackToSimpleAuth;
     this.saslPropsResolver = saslPropsResolver;
     this.trustedChannelResolver = trustedChannelResolver;
   }
@@ -221,22 +239,26 @@ public class SaslDataTransferClient {
         "SASL client skipping handshake in secured configuration with "
         + "privileged port for addr = {}, datanodeId = {}", addr, datanodeId);
       return null;
-    } else if (accessToken.getIdentifier().length == 0) {
-      if (!fallbackToSimpleAuthAllowed) {
-        throw new IOException(
-          "No block access token was provided (insecure cluster), but this " +
-          "client is configured to allow only secure connections.");
-      }
+    } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
       LOG.debug(
         "SASL client skipping handshake in secured configuration with "
         + "unsecured cluster for addr = {}, datanodeId = {}", addr, datanodeId);
       return null;
-    } else {
+    } else if (saslPropsResolver != null) {
       LOG.debug(
         "SASL client doing general handshake for addr = {}, datanodeId = {}",
         addr, datanodeId);
       return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken,
         datanodeId);
+    } else {
+      // It's a secured cluster using non-privileged ports, but no SASL.  The
+      // only way this can happen is if the DataNode has
+      // ignore.secure.ports.for.testing configured, so this is a rare edge case.
+      LOG.debug(
+        "SASL client skipping handshake in secured configuration with no SASL "
+        + "protection configured for addr = {}, datanodeId = {}",
+        addr, datanodeId);
+      return null;
     }
   }
 
@@ -348,12 +370,6 @@ public class SaslDataTransferClient {
       OutputStream underlyingOut, InputStream underlyingIn,
       Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
       throws IOException {
-    if (saslPropsResolver == null) {
-      throw new IOException(String.format("Cannot create a secured " +
-        "connection if DataNode listens on unprivileged port (%d) and no " +
-        "protection is defined in configuration property %s.",
-        datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
-    }
     Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr);
 
     String userName = buildUserName(accessToken);

+ 19 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java

@@ -112,11 +112,29 @@ public class SaslDataTransferServer {
         "SASL server skipping handshake in unsecured configuration for "
         + "peer = {}, datanodeId = {}", peer, datanodeId);
       return new IOStreamPair(underlyingIn, underlyingOut);
-    } else {
+    } else if (dnConf.getSaslPropsResolver() != null) {
       LOG.debug(
         "SASL server doing general handshake for peer = {}, datanodeId = {}",
         peer, datanodeId);
       return getSaslStreams(peer, underlyingOut, underlyingIn, datanodeId);
+    } else if (dnConf.getIgnoreSecurePortsForTesting()) {
+      // It's a secured cluster using non-privileged ports, but no SASL.  The
+      // only way this can happen is if the DataNode has
+      // ignore.secure.ports.for.testing configured, so this is a rare edge case.
+      LOG.debug(
+        "SASL server skipping handshake in secured configuration with no SASL "
+        + "protection configured for peer = {}, datanodeId = {}",
+        peer, datanodeId);
+      return new IOStreamPair(underlyingIn, underlyingOut);
+    } else {
+      // The error message here intentionally does not mention
+      // ignore.secure.ports.for.testing.  That's intended for dev use only.
+      // This code path is not expected to execute ever, because DataNode startup
+      // checks for invalid configuration and aborts.
+      throw new IOException(String.format("Cannot create a secured " +
+        "connection if DataNode listens on unprivileged port (%d) and no " +
+        "protection is defined in configuration property %s.",
+        datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
     }
   }
 
@@ -257,12 +275,6 @@ public class SaslDataTransferServer {
   private IOStreamPair getSaslStreams(Peer peer, OutputStream underlyingOut,
       InputStream underlyingIn, final DatanodeID datanodeId) throws IOException {
     SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver();
-    if (saslPropsResolver == null) {
-      throw new IOException(String.format("Cannot create a secured " +
-        "connection if DataNode listens on unprivileged port (%d) and no " +
-        "protection is defined in configuration property %s.",
-        datanodeId.getXferPort(), DFS_DATA_TRANSFER_PROTECTION_KEY));
-    }
     Map<String, String> saslProps = saslPropsResolver.getServerProperties(
       getPeerAddress(peer));
 

+ 1 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java

@@ -48,8 +48,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.StorageType;
@@ -787,12 +785,9 @@ public class Dispatcher {
         : Executors.newFixedThreadPool(dispatcherThreads);
     this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
 
-    final boolean fallbackToSimpleAuthAllowed = conf.getBoolean(
-        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
-        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
     this.saslClient = new SaslDataTransferClient(
         DataTransferSaslUtil.getSaslPropertiesResolver(conf),
-        TrustedChannelResolver.getInstance(conf), fallbackToSimpleAuthAllowed);
+        TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
   }
 
   public DistributedFileSystem getDistributedFileSystem() {

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java

@@ -27,6 +27,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -101,6 +102,7 @@ public class NameNodeConnector implements Closeable {
   private final NamenodeProtocol namenode;
   private final ClientProtocol client;
   private final KeyManager keyManager;
+  final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
 
   private final DistributedFileSystem fs;
   private final Path idPath;
@@ -120,7 +122,7 @@ public class NameNodeConnector implements Closeable {
     this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
         NamenodeProtocol.class).getProxy();
     this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
-        ClientProtocol.class).getProxy();
+        ClientProtocol.class, fallbackToSimpleAuth).getProxy();
     this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
 
     final NamespaceInfo namespaceinfo = namenode.versionRequest();

+ 17 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java

@@ -48,6 +48,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEF
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -90,6 +92,7 @@ public class DNConf {
   final String encryptionAlgorithm;
   final SaslPropertiesResolver saslPropsResolver;
   final TrustedChannelResolver trustedChannelResolver;
+  private final boolean ignoreSecurePortsForTesting;
   
   final long xceiverStopTimeout;
   final long restartReplicaExpiry;
@@ -173,6 +176,9 @@ public class DNConf {
     this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
     this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
       conf);
+    this.ignoreSecurePortsForTesting = conf.getBoolean(
+        IGNORE_SECURE_PORTS_FOR_TESTING_KEY,
+        IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT);
     
     this.xceiverStopTimeout = conf.getLong(
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
@@ -238,4 +244,15 @@ public class DNConf {
   public TrustedChannelResolver getTrustedChannelResolver() {
     return trustedChannelResolver;
   }
+
+  /**
+   * Returns true if configuration is set to skip checking for proper
+   * port configuration in a secured cluster.  This is only intended for use in
+   * dev testing.
+   *
+   * @return true if configured to skip checking secured port configuration
+   */
+  public boolean getIgnoreSecurePortsForTesting() {
+    return ignoreSecurePortsForTesting;
+  }
 }

+ 13 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
@@ -46,9 +44,12 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 import java.io.BufferedOutputStream;
@@ -170,6 +171,7 @@ import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.SaslPropertiesResolver;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -967,8 +969,6 @@ public class DataNode extends ReconfigurableBase
                      SecureResources resources
                      ) throws IOException {
 
-    checkSecureConfig(conf, resources);
-
     // settings global for all BPs in the Data Node
     this.secureResources = resources;
     synchronized (this) {
@@ -976,6 +976,8 @@ public class DataNode extends ReconfigurableBase
     }
     this.conf = conf;
     this.dnConf = new DNConf(conf);
+    checkSecureConfig(dnConf, conf, resources);
+
     this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
 
     if (dnConf.maxLockedMemory > 0) {
@@ -1031,10 +1033,7 @@ public class DataNode extends ReconfigurableBase
     // exit without having to explicitly shutdown its thread pool.
     readaheadPool = ReadaheadPool.getInstance();
     saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
-      dnConf.trustedChannelResolver,
-      conf.getBoolean(
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
+      dnConf.trustedChannelResolver);
     saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
   }
 
@@ -1054,23 +1053,24 @@ public class DataNode extends ReconfigurableBase
    * must check if the target port is a privileged port, and if so, skip the
    * SASL handshake.
    *
+   * @param dnConf DNConf to check
    * @param conf Configuration to check
    * @param resources SecuredResources obtained for DataNode
    * @throws RuntimeException if security enabled, but configuration is insecure
    */
-  private static void checkSecureConfig(Configuration conf,
+  private static void checkSecureConfig(DNConf dnConf, Configuration conf,
       SecureResources resources) throws RuntimeException {
     if (!UserGroupInformation.isSecurityEnabled()) {
       return;
     }
-    String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
-    if (resources != null && dataTransferProtection == null) {
+    SaslPropertiesResolver saslPropsResolver = dnConf.getSaslPropsResolver();
+    if (resources != null && saslPropsResolver == null) {
       return;
     }
-    if (conf.getBoolean("ignore.secure.ports.for.testing", false)) {
+    if (dnConf.getIgnoreSecurePortsForTesting()) {
       return;
     }
-    if (dataTransferProtection != null &&
+    if (saslPropsResolver != null &&
         DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY &&
         resources == null) {
       return;

+ 4 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -59,10 +56,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
-import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
-import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
@@ -161,7 +155,6 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
   private List<String> snapshottableDirs = null;
 
   private final BlockPlacementPolicy bpPolicy;
-  private final SaslDataTransferClient saslClient;
 
   /**
    * Filesystem checker.
@@ -188,12 +181,6 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
         networktopology,
         namenode.getNamesystem().getBlockManager().getDatanodeManager()
         .getHost2DatanodeMap());
-    this.saslClient = new SaslDataTransferClient(
-      DataTransferSaslUtil.getSaslPropertiesResolver(conf),
-      TrustedChannelResolver.getInstance(conf),
-      conf.getBoolean(
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
-        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
     
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
@@ -594,7 +581,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
    * bad. Both places should be refactored to provide a method to copy blocks
    * around.
    */
-  private void copyBlock(DFSClient dfs, LocatedBlock lblock,
+  private void copyBlock(final DFSClient dfs, LocatedBlock lblock,
                          OutputStream fos) throws Exception {
     int failures = 0;
     InetSocketAddress targetAddr = null;
@@ -647,8 +634,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
                 try {
                   s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
                   s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-                  peer = TcpPeerServer.peerFromSocketAndKey(saslClient, s,
-                        NamenodeFsck.this, blockToken, datanodeId);
+                  peer = TcpPeerServer.peerFromSocketAndKey(
+                        dfs.getSaslDataTransferClient(), s, NamenodeFsck.this,
+                        blockToken, datanodeId);
                 } finally {
                   if (peer == null) {
                     IOUtils.closeQuietly(s);

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java

@@ -18,12 +18,16 @@
 
 package org.apache.hadoop.hdfs.server.namenode.ha;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
 
 public abstract class AbstractNNFailoverProxyProvider<T> implements
    FailoverProxyProvider <T> {
 
+  protected AtomicBoolean fallbackToSimpleAuth;
+
   /**
    * Inquire whether logical HA URI is used for the implementation. If it is
    * used, a special token handling may be needed to make sure a token acquired 
@@ -32,4 +36,14 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
    * @return true if logical HA URI is used. false, if not used.
    */
   public abstract boolean useLogicalURI(); 
+
+  /**
+   * Set for tracking if a secure client falls back to simple auth.
+   *
+   * @param fallbackToSimpleAuth - set to true or false during this method to
+   *   indicate if a secure client falls back to simple auth
+   */
+  public void setFallbackToSimpleAuth(AtomicBoolean fallbackToSimpleAuth) {
+    this.fallbackToSimpleAuth = fallbackToSimpleAuth;
+  }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java

@@ -122,7 +122,7 @@ public class ConfiguredFailoverProxyProvider<T> extends
     if (current.namenode == null) {
       try {
         current.namenode = NameNodeProxies.createNonHAProxy(conf,
-            current.address, xface, ugi, false).getProxy();
+            current.address, xface, ugi, false, fallbackToSimpleAuth).getProxy();
       } catch (IOException e) {
         LOG.error("Failed to create RPC proxy to NameNode", e);
         throw new RuntimeException(e);

+ 34 - 11
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/TestSaslDataTransfer.java

@@ -18,6 +18,9 @@
 package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
+
 import static org.junit.Assert.*;
 
 import java.io.IOException;
@@ -29,11 +32,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.After;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
 
 public class TestSaslDataTransfer extends SaslDataTransferTestCase {
 
@@ -49,6 +54,9 @@ public class TestSaslDataTransfer extends SaslDataTransferTestCase {
   @Rule
   public ExpectedException exception = ExpectedException.none();
 
+  @Rule
+  public Timeout timeout = new Timeout(60000);
+
   @After
   public void shutdown() {
     IOUtils.cleanup(null, fs);
@@ -98,17 +106,6 @@ public class TestSaslDataTransfer extends SaslDataTransferTestCase {
     doTest(clientConf);
   }
 
-  @Test
-  public void testClientSaslNoServerSasl() throws Exception {
-    HdfsConfiguration clusterConf = createSecureConfig("");
-    startCluster(clusterConf);
-    HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
-    clientConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication");
-    exception.expect(IOException.class);
-    exception.expectMessage("could only be replicated to 0 nodes");
-    doTest(clientConf);
-  }
-
   @Test
   public void testServerSaslNoClientSasl() throws Exception {
     HdfsConfiguration clusterConf = createSecureConfig(
@@ -121,6 +118,32 @@ public class TestSaslDataTransfer extends SaslDataTransferTestCase {
     doTest(clientConf);
   }
 
+  @Test
+  public void testDataNodeAbortsIfNoSasl() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig("");
+    exception.expect(RuntimeException.class);
+    exception.expectMessage("Cannot start secure DataNode");
+    startCluster(clusterConf);
+  }
+
+  @Test
+  public void testDataNodeAbortsIfNotHttpsOnly() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig("authentication");
+    clusterConf.set(DFS_HTTP_POLICY_KEY,
+      HttpConfig.Policy.HTTP_AND_HTTPS.name());
+    exception.expect(RuntimeException.class);
+    exception.expectMessage("Cannot start secure DataNode");
+    startCluster(clusterConf);
+  }
+
+  @Test
+  public void testNoSaslAndSecurePortsIgnored() throws Exception {
+    HdfsConfiguration clusterConf = createSecureConfig("");
+    clusterConf.setBoolean(IGNORE_SECURE_PORTS_FOR_TESTING_KEY, true);
+    startCluster(clusterConf);
+    doTest(clusterConf);
+  }
+
   /**
    * Tests DataTransferProtocol with the given client configuration.
    *

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java

@@ -194,7 +194,7 @@ public class TestRetryCacheWithHA {
     URI nnUri = dfs.getUri();
     FailoverProxyProvider<ClientProtocol> failoverProxyProvider = 
         NameNodeProxies.createFailoverProxyProvider(conf, 
-            nnUri, ClientProtocol.class, true);
+            nnUri, ClientProtocol.class, true, null);
     InvocationHandler dummyHandler = new DummyRetryInvocationHandler(
         failoverProxyProvider, RetryPolicies
         .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,