Przeglądaj źródła

YARN-6093. Minor bugs with AMRMtoken renewal and state store availability when using FederationRMFailoverProxyProvider during RM failover. (Botong Huang via Subru).

(cherry picked from commit 66500f4fa6155d29435d7c92fd6d68079c4cab86)
(cherry picked from commit 98b45b0ed34a060e0a529069cd15676d91600dff)
Subru Krishnan 8 lat temu
rodzic
commit
ce9110ab5c

+ 69 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java

@@ -19,17 +19,21 @@ package org.apache.hadoop.yarn.client;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import org.apache.hadoop.yarn.server.federation.failover.FederationRMFailoverProxyProvider;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -44,6 +48,10 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * Unit tests for FederationRMFailoverProxyProvider.
  */
@@ -151,4 +159,65 @@ public class TestFederationRMFailoverProxyProvider {
     }
   }
 
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testUGIForProxyCreation()
+      throws IOException, InterruptedException {
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+
+    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    UserGroupInformation user1 =
+        UserGroupInformation.createProxyUser("user1", currentUser);
+    UserGroupInformation user2 =
+        UserGroupInformation.createProxyUser("user2", currentUser);
+
+    final TestableFederationRMFailoverProxyProvider provider =
+        new TestableFederationRMFailoverProxyProvider();
+
+    InetSocketAddress addr =
+        conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+    final ClientRMProxy rmProxy = mock(ClientRMProxy.class);
+    when(rmProxy.getRMAddress(any(YarnConfiguration.class), any(Class.class)))
+        .thenReturn(addr);
+
+    user1.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() {
+        provider.init(conf, rmProxy, ApplicationMasterProtocol.class);
+        return null;
+      }
+    });
+
+    final ProxyInfo currentProxy = provider.getProxy();
+    Assert.assertEquals("user1", provider.getLastProxyUGI().getUserName());
+
+    user2.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() {
+        provider.performFailover(currentProxy.proxy);
+        return null;
+      }
+    });
+    Assert.assertEquals("user1", provider.getLastProxyUGI().getUserName());
+
+    provider.close();
+  }
+
+  protected static class TestableFederationRMFailoverProxyProvider<T>
+      extends FederationRMFailoverProxyProvider<T> {
+
+    private UserGroupInformation lastProxyUGI = null;
+
+    @Override
+    protected T createRMProxy(InetSocketAddress rmAddress) throws IOException {
+      lastProxyUGI = UserGroupInformation.getCurrentUser();
+      return super.createRMProxy(rmAddress);
+    }
+
+    public UserGroupInformation getLastProxyUGI() {
+      return lastProxyUGI;
+    }
+  }
 }

+ 49 - 39
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java

@@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.federation.failover;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Collection;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -29,14 +29,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
 import org.apache.hadoop.yarn.client.RMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -44,6 +42,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -64,7 +63,7 @@ public class FederationRMFailoverProxyProvider<T>
   private YarnConfiguration conf;
   private FederationStateStoreFacade facade;
   private SubClusterId subClusterId;
-  private Collection<Token<? extends TokenIdentifier>> originalTokens;
+  private UserGroupInformation originalUser;
   private boolean federationFailoverEnabled = false;
 
   @Override
@@ -97,59 +96,67 @@ public class FederationRMFailoverProxyProvider<T>
             YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
 
     try {
-      UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-      originalTokens = currentUser.getTokens();
+      this.originalUser = UserGroupInformation.getCurrentUser();
       LOG.info("Initialized Federation proxy for user: {}",
-          currentUser.getUserName());
+          this.originalUser.getUserName());
     } catch (IOException e) {
       LOG.warn("Could not get information of requester, ignoring for now.");
+      this.originalUser = null;
     }
 
   }
 
-  private void addOriginalTokens(UserGroupInformation currentUser) {
-    if (originalTokens == null || originalTokens.isEmpty()) {
-      return;
-    }
-    for (Token<? extends TokenIdentifier> token : originalTokens) {
-      currentUser.addToken(token);
-    }
+  @VisibleForTesting
+  protected T createRMProxy(InetSocketAddress rmAddress) throws IOException {
+    return rmProxy.getProxy(conf, protocol, rmAddress);
   }
 
   private T getProxyInternal(boolean isFailover) {
     SubClusterInfo subClusterInfo;
-    UserGroupInformation currentUser = null;
+    // Use the existing proxy as a backup in case getting the new proxy fails.
+    // Note that if the first time it fails, the backup is also null. In that
+    // case we will hit NullPointerException and throw it back to AM.
+    T proxy = this.current;
     try {
       LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
           subClusterId);
       subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
       // updating the conf with the refreshed RM addresses as proxy
-      // creations
-      // are based out of conf
+      // creations are based out of conf
       updateRMAddress(subClusterInfo);
-      currentUser = UserGroupInformation.getCurrentUser();
-      addOriginalTokens(currentUser);
-    } catch (YarnException e) {
+      if (this.originalUser == null) {
+        InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
+        LOG.info(
+            "Connecting to {} subClusterId {} with protocol {}"
+                + " without a proxy user",
+            rmAddress, subClusterId, protocol.getSimpleName());
+        proxy = createRMProxy(rmAddress);
+      } else {
+        // If the original ugi exists, always use that to create proxy because
+        // it contains up-to-date AMRMToken
+        proxy = this.originalUser.doAs(new PrivilegedExceptionAction<T>() {
+          @Override
+          public T run() throws IOException {
+            InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
+            LOG.info(
+                "Connecting to {} subClusterId {} with protocol {} as user {}",
+                rmAddress, subClusterId, protocol.getSimpleName(),
+                originalUser);
+            return createRMProxy(rmAddress);
+          }
+        });
+      }
+    } catch (Exception e) {
       LOG.error("Exception while trying to create proxy to the ResourceManager"
           + " for SubClusterId: {}", subClusterId, e);
-      return null;
-    } catch (IOException e) {
-      LOG.warn("Could not get information of requester, ignoring for now.");
-    }
-    try {
-      final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
-      LOG.info("Connecting to {} with protocol {} as user: {}", rmAddress,
-          protocol.getSimpleName(), currentUser);
-      LOG.info("Failed over to the RM at {} for SubClusterId: {}", rmAddress,
-          subClusterId);
-      return rmProxy.getProxy(conf, protocol, rmAddress);
-    } catch (IOException ioe) {
-      LOG.error(
-          "IOException while trying to create proxy to the ResourceManager"
-              + " for SubClusterId: {}",
-          subClusterId, ioe);
-      return null;
+      if (proxy == null) {
+        throw new YarnRuntimeException(
+            String.format("Create initial proxy to the ResourceManager for"
+                + " SubClusterId %s failed", subClusterId),
+            e);
+      }
     }
+    return proxy;
   }
 
   private void updateRMAddress(SubClusterInfo subClusterInfo) {
@@ -177,8 +184,11 @@ public class FederationRMFailoverProxyProvider<T>
 
   @Override
   public synchronized void performFailover(T currentProxy) {
-    closeInternal(currentProxy);
+    // It will not return null proxy here
     current = getProxyInternal(federationFailoverEnabled);
+    if (current != currentProxy) {
+      closeInternal(currentProxy);
+    }
   }
 
   @Override