Browse Source

HADOOP-17975. Fallback to simple auth does not work for a secondary DistributedFileSystem instance. (#3579)

(cherry picked from commit ae3ba45db58467ce57b0a440e236fd80f6be9ec6)
Istvan Fajth 3 years ago
parent
commit
dbcadcc21e

+ 47 - 24
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -807,17 +807,18 @@ public class Client implements AutoCloseable {
      */
     private synchronized void setupIOstreams(
         AtomicBoolean fallbackToSimpleAuth) {
-      if (socket != null || shouldCloseConnection.get()) {
-        return;
-      }
-      UserGroupInformation ticket = remoteId.getTicket();
-      if (ticket != null) {
-        final UserGroupInformation realUser = ticket.getRealUser();
-        if (realUser != null) {
-          ticket = realUser;
-        }
-      }
       try {
+        if (socket != null || shouldCloseConnection.get()) {
+          setFallBackToSimpleAuth(fallbackToSimpleAuth);
+          return;
+        }
+        UserGroupInformation ticket = remoteId.getTicket();
+        if (ticket != null) {
+          final UserGroupInformation realUser = ticket.getRealUser();
+          if (realUser != null) {
+            ticket = realUser;
+          }
+        }
         connectingThread.set(Thread.currentThread());
         if (LOG.isDebugEnabled()) {
           LOG.debug("Connecting to "+server);
@@ -863,20 +864,8 @@ public class Client implements AutoCloseable {
               remoteId.saslQop =
                   (String)saslRpcClient.getNegotiatedProperty(Sasl.QOP);
               LOG.debug("Negotiated QOP is :" + remoteId.saslQop);
-              if (fallbackToSimpleAuth != null) {
-                fallbackToSimpleAuth.set(false);
-              }
-            } else if (UserGroupInformation.isSecurityEnabled()) {
-              if (!fallbackAllowed) {
-                throw new AccessControlException(
-                    "Server asks us to fall back to SIMPLE " +
-                    "auth, but this client is configured to only allow secure " +
-                    "connections.");
-              }
-              if (fallbackToSimpleAuth != null) {
-                fallbackToSimpleAuth.set(true);
-              }
             }
+            setFallBackToSimpleAuth(fallbackToSimpleAuth);
           }
 
           if (doPing) {
@@ -909,7 +898,41 @@ public class Client implements AutoCloseable {
         connectingThread.set(null);
       }
     }
-    
+
+    private void setFallBackToSimpleAuth(AtomicBoolean fallbackToSimpleAuth)
+        throws AccessControlException {
+      if (authMethod == null || authProtocol != AuthProtocol.SASL) {
+        if (authProtocol == AuthProtocol.SASL) {
+          LOG.trace("Auth method is not set, yield from setting auth fallback.");
+        }
+        return;
+      }
+      if (fallbackToSimpleAuth == null) {
+        // this should happen only during testing.
+        LOG.trace("Connection {} will skip to set fallbackToSimpleAuth as it is null.", remoteId);
+      } else {
+        if (fallbackToSimpleAuth.get()) {
+          // we already set the value to true, we do not need to examine again.
+          return;
+        }
+      }
+      if (authMethod != AuthMethod.SIMPLE) {
+        if (fallbackToSimpleAuth != null) {
+          LOG.trace("Disabling fallbackToSimpleAuth, target does not use SIMPLE authentication.");
+          fallbackToSimpleAuth.set(false);
+        }
+      } else if (UserGroupInformation.isSecurityEnabled()) {
+        if (!fallbackAllowed) {
+          throw new AccessControlException("Server asks us to fall back to SIMPLE auth, but this "
+              + "client is configured to only allow secure connections.");
+        }
+        if (fallbackToSimpleAuth != null) {
+          LOG.trace("Enabling fallbackToSimpleAuth for target, as we are allowed to fall back.");
+          fallbackToSimpleAuth.set(true);
+        }
+      }
+    }
+
     private void closeConnection() {
       if (socket == null) {
         return;

+ 11 - 9
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java

@@ -22,6 +22,7 @@ import com.google.protobuf.BlockingService;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
@@ -124,18 +125,19 @@ public class TestRpcBase {
     return server;
   }
 
-  protected static TestRpcService getClient(InetSocketAddress serverAddr,
-                                     Configuration clientConf)
+  protected static TestRpcService getClient(InetSocketAddress serverAddr, Configuration clientConf)
       throws ServiceException {
-    try {
-      return RPC.getProxy(TestRpcService.class, 0, serverAddr, clientConf);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
+    return getClient(serverAddr, clientConf, null);
+  }
+
+  protected static TestRpcService getClient(InetSocketAddress serverAddr,
+      Configuration clientConf, RetryPolicy connectionRetryPolicy) throws ServiceException {
+    return getClient(serverAddr, clientConf, connectionRetryPolicy, null);
   }
 
   protected static TestRpcService getClient(InetSocketAddress serverAddr,
-      Configuration clientConf, final RetryPolicy connectionRetryPolicy)
+      Configuration clientConf, final RetryPolicy connectionRetryPolicy,
+      AtomicBoolean fallbackToSimpleAuth)
       throws ServiceException {
     try {
       return RPC.getProtocolProxy(
@@ -146,7 +148,7 @@ public class TestRpcBase {
           clientConf,
           NetUtils.getDefaultSocketFactory(clientConf),
           RPC.getRpcTimeout(clientConf),
-          connectionRetryPolicy, null).getProxy();
+          connectionRetryPolicy, fallbackToSimpleAuth).getProxy();
     } catch (IOException e) {
       throw new ServiceException(e);
     }

+ 159 - 53
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java

@@ -72,6 +72,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
@@ -569,6 +570,72 @@ public class TestSaslRPC extends TestRpcBase {
     assertAuthEquals(SIMPLE,    getAuthMethod(KERBEROS, SIMPLE, UseToken.OTHER));
   }
 
+  /**
+   * In DfsClient there is a fallback mechanism to simple auth, which passes in an atomic boolean
+   * to the ipc Client, which then sets it during setupIOStreams.
+   * SetupIOStreams were running only once per connection, so if two separate DfsClient was
+   * instantiated, then due to the connection caching inside the ipc client, the second DfsClient
+   * did not have the passed in atomic boolean set properly if the first client was not yet closed,
+   * as setupIOStreams was yielding to set up new streams as it has reused the already existing
+   * connection.
+   * This test mimics this behaviour, and asserts the fallback whether it is set correctly.
+   * @see <a href="https://issues.apache.org/jira/browse/HADOOP-17975">HADOOP-17975</a>
+   */
+  @Test
+  public void testClientFallbackToSimpleAuthForASecondClient() throws Exception {
+    Configuration serverConf = createConfForAuth(SIMPLE);
+    Server server = startServer(serverConf,
+        setupServerUgi(SIMPLE, serverConf),
+        createServerSecretManager(SIMPLE, new TestTokenSecretManager()));
+    final InetSocketAddress serverAddress = NetUtils.getConnectAddress(server);
+
+    clientFallBackToSimpleAllowed = true;
+    Configuration clientConf = createConfForAuth(KERBEROS);
+    UserGroupInformation clientUgi = setupClientUgi(KERBEROS, clientConf);
+
+    AtomicBoolean fallbackToSimpleAuth1 = new AtomicBoolean();
+    AtomicBoolean fallbackToSimpleAuth2 = new AtomicBoolean();
+    try {
+      LOG.info("trying ugi:"+ clientUgi +" tokens:"+ clientUgi.getTokens());
+      clientUgi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        TestRpcService proxy1 = null;
+        TestRpcService proxy2 = null;
+        try {
+          proxy1 = getClient(serverAddress, clientConf, null, fallbackToSimpleAuth1);
+          proxy1.ping(null, newEmptyRequest());
+          // make sure the other side thinks we are who we said we are!!!
+          assertEquals(clientUgi.getUserName(),
+              proxy1.getAuthUser(null, newEmptyRequest()).getUser());
+          AuthMethod authMethod =
+              convert(proxy1.getAuthMethod(null, newEmptyRequest()));
+          assertAuthEquals(SIMPLE, authMethod.toString());
+
+          proxy2 = getClient(serverAddress, clientConf, null, fallbackToSimpleAuth2);
+          proxy2.ping(null, newEmptyRequest());
+          // make sure the other side thinks we are who we said we are!!!
+          assertEquals(clientUgi.getUserName(),
+              proxy2.getAuthUser(null, newEmptyRequest()).getUser());
+          AuthMethod authMethod2 =
+              convert(proxy2.getAuthMethod(null, newEmptyRequest()));
+          assertAuthEquals(SIMPLE, authMethod2.toString());
+        } finally {
+          if (proxy1 != null) {
+            RPC.stopProxy(proxy1);
+          }
+          if (proxy2 != null) {
+            RPC.stopProxy(proxy2);
+          }
+        }
+        return null;
+      });
+    } finally {
+      server.stop();
+    }
+
+    assertTrue("First client does not set to fall back properly.", fallbackToSimpleAuth1.get());
+    assertTrue("Second client does not set to fall back properly.", fallbackToSimpleAuth2.get());
+  }
+
   @Test
   public void testNoClientFallbackToSimple()
       throws Exception {
@@ -815,22 +882,44 @@ public class TestSaslRPC extends TestRpcBase {
       return e.toString();
     }
   }
-  
+
   private String internalGetAuthMethod(
       final AuthMethod clientAuth,
       final AuthMethod serverAuth,
       final UseToken tokenType) throws Exception {
-    
-    final Configuration serverConf = new Configuration(conf);
-    serverConf.set(HADOOP_SECURITY_AUTHENTICATION, serverAuth.toString());
-    UserGroupInformation.setConfiguration(serverConf);
-    
-    final UserGroupInformation serverUgi = (serverAuth == KERBEROS)
-        ? UserGroupInformation.createRemoteUser("server/localhost@NONE")
-        : UserGroupInformation.createRemoteUser("server");
-    serverUgi.setAuthenticationMethod(serverAuth);
 
     final TestTokenSecretManager sm = new TestTokenSecretManager();
+
+    Configuration serverConf = createConfForAuth(serverAuth);
+    Server server = startServer(
+        serverConf,
+        setupServerUgi(serverAuth, serverConf),
+        createServerSecretManager(serverAuth, sm));
+    final InetSocketAddress serverAddress = NetUtils.getConnectAddress(server);
+
+    final Configuration clientConf = createConfForAuth(clientAuth);
+    final UserGroupInformation clientUgi = setupClientUgi(clientAuth, clientConf);
+
+    setupTokenIfNeeded(tokenType, sm, clientUgi, serverAddress);
+
+    try {
+      return createClientAndQueryAuthMethod(serverAddress, clientConf, clientUgi, null);
+    } finally {
+      server.stop();
+    }
+  }
+
+  private Configuration createConfForAuth(AuthMethod clientAuth) {
+    final Configuration clientConf = new Configuration(conf);
+    clientConf.set(HADOOP_SECURITY_AUTHENTICATION, clientAuth.toString());
+    clientConf.setBoolean(
+        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        clientFallBackToSimpleAllowed);
+    return clientConf;
+  }
+
+  private SecretManager<?> createServerSecretManager(
+      AuthMethod serverAuth, TestTokenSecretManager sm) {
     boolean useSecretManager = (serverAuth != SIMPLE);
     if (enableSecretManager != null) {
       useSecretManager &= enableSecretManager;
@@ -839,26 +928,43 @@ public class TestSaslRPC extends TestRpcBase {
       useSecretManager |= forceSecretManager;
     }
     final SecretManager<?> serverSm = useSecretManager ? sm : null;
+    return serverSm;
+  }
 
+  private Server startServer(Configuration serverConf, UserGroupInformation serverUgi,
+      SecretManager<?> serverSm) throws IOException, InterruptedException {
     Server server = serverUgi.doAs(new PrivilegedExceptionAction<Server>() {
       @Override
       public Server run() throws IOException {
         return setupTestServer(serverConf, 5, serverSm);
       }
     });
+    return server;
+  }
 
-    final Configuration clientConf = new Configuration(conf);
-    clientConf.set(HADOOP_SECURITY_AUTHENTICATION, clientAuth.toString());
-    clientConf.setBoolean(
-        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
-        clientFallBackToSimpleAllowed);
+  private UserGroupInformation setupServerUgi(AuthMethod serverAuth,
+      Configuration serverConf) {
+    UserGroupInformation.setConfiguration(serverConf);
+
+    final UserGroupInformation serverUgi = (serverAuth == KERBEROS)
+        ? UserGroupInformation.createRemoteUser("server/localhost@NONE")
+        : UserGroupInformation.createRemoteUser("server");
+    serverUgi.setAuthenticationMethod(serverAuth);
+    return serverUgi;
+  }
+
+  private UserGroupInformation setupClientUgi(AuthMethod clientAuth,
+      Configuration clientConf) {
     UserGroupInformation.setConfiguration(clientConf);
-    
+
     final UserGroupInformation clientUgi =
         UserGroupInformation.createRemoteUser("client");
-    clientUgi.setAuthenticationMethod(clientAuth);    
+    clientUgi.setAuthenticationMethod(clientAuth);
+    return clientUgi;
+  }
 
-    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+  private void setupTokenIfNeeded(UseToken tokenType, TestTokenSecretManager sm,
+      UserGroupInformation clientUgi, InetSocketAddress addr) {
     if (tokenType != UseToken.NONE) {
       TestTokenIdentifier tokenId = new TestTokenIdentifier(
           new Text(clientUgi.getUserName()));
@@ -881,44 +987,44 @@ public class TestSaslRPC extends TestRpcBase {
       }
       clientUgi.addToken(token);
     }
+  }
 
-    try {
-      LOG.info("trying ugi:"+clientUgi+" tokens:"+clientUgi.getTokens());
-      return clientUgi.doAs(new PrivilegedExceptionAction<String>() {
-        @Override
-        public String run() throws IOException {
-          TestRpcService proxy = null;
-          try {
-            proxy = getClient(addr, clientConf);
-
-            proxy.ping(null, newEmptyRequest());
-            // make sure the other side thinks we are who we said we are!!!
-            assertEquals(clientUgi.getUserName(),
-                proxy.getAuthUser(null, newEmptyRequest()).getUser());
-            AuthMethod authMethod =
-                convert(proxy.getAuthMethod(null, newEmptyRequest()));
-            // verify sasl completed with correct QOP
-            assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null,
-                         RPC.getConnectionIdForProxy(proxy).getSaslQop());
-            return authMethod != null ? authMethod.toString() : null;
-          } catch (ServiceException se) {
-            if (se.getCause() instanceof RemoteException) {
-              throw (RemoteException) se.getCause();
-            } else if (se.getCause() instanceof IOException) {
-              throw (IOException) se.getCause();
-            } else {
-              throw new RuntimeException(se.getCause());
-            }
-          } finally {
-            if (proxy != null) {
-              RPC.stopProxy(proxy);
-            }
+  private String createClientAndQueryAuthMethod(InetSocketAddress serverAddress,
+      Configuration clientConf, UserGroupInformation clientUgi, AtomicBoolean fallbackToSimpleAuth)
+      throws IOException, InterruptedException {
+    LOG.info("trying ugi:"+ clientUgi +" tokens:"+ clientUgi.getTokens());
+    return clientUgi.doAs(new PrivilegedExceptionAction<String>() {
+      @Override
+      public String run() throws IOException {
+        TestRpcService proxy = null;
+        try {
+          proxy = getClient(serverAddress, clientConf, null, fallbackToSimpleAuth);
+
+          proxy.ping(null, newEmptyRequest());
+          // make sure the other side thinks we are who we said we are!!!
+          assertEquals(clientUgi.getUserName(),
+              proxy.getAuthUser(null, newEmptyRequest()).getUser());
+          AuthMethod authMethod =
+              convert(proxy.getAuthMethod(null, newEmptyRequest()));
+          // verify sasl completed with correct QOP
+          assertEquals((authMethod != SIMPLE) ? expectedQop.saslQop : null,
+              RPC.getConnectionIdForProxy(proxy).getSaslQop());
+          return authMethod != null ? authMethod.toString() : null;
+        } catch (ServiceException se) {
+          if (se.getCause() instanceof RemoteException) {
+            throw (RemoteException) se.getCause();
+          } else if (se.getCause() instanceof IOException) {
+            throw (IOException) se.getCause();
+          } else {
+            throw new RuntimeException(se.getCause());
+          }
+        } finally {
+          if (proxy != null) {
+            RPC.stopProxy(proxy);
           }
         }
-      });
-    } finally {
-      server.stop();
-    }
+      }
+    });
   }
 
   private static void assertAuthEquals(AuthMethod expect,