|
@@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.federation.failover;
|
|
|
import java.io.Closeable;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
-import java.util.Collection;
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
@@ -29,14 +29,12 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
-import org.apache.hadoop.security.token.Token;
|
|
|
-import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
|
import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
|
|
|
import org.apache.hadoop.yarn.client.RMProxy;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
-import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
|
@@ -44,6 +42,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.base.Preconditions;
|
|
|
|
|
|
/**
|
|
@@ -64,7 +63,7 @@ public class FederationRMFailoverProxyProvider<T>
|
|
|
private YarnConfiguration conf;
|
|
|
private FederationStateStoreFacade facade;
|
|
|
private SubClusterId subClusterId;
|
|
|
- private Collection<Token<? extends TokenIdentifier>> originalTokens;
|
|
|
+ private UserGroupInformation originalUser;
|
|
|
private boolean federationFailoverEnabled = false;
|
|
|
|
|
|
@Override
|
|
@@ -97,59 +96,67 @@ public class FederationRMFailoverProxyProvider<T>
|
|
|
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
|
|
|
|
|
|
try {
|
|
|
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
|
|
|
- originalTokens = currentUser.getTokens();
|
|
|
+ this.originalUser = UserGroupInformation.getCurrentUser();
|
|
|
LOG.info("Initialized Federation proxy for user: {}",
|
|
|
- currentUser.getUserName());
|
|
|
+ this.originalUser.getUserName());
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Could not get information of requester, ignoring for now.");
|
|
|
+ this.originalUser = null;
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
- private void addOriginalTokens(UserGroupInformation currentUser) {
|
|
|
- if (originalTokens == null || originalTokens.isEmpty()) {
|
|
|
- return;
|
|
|
- }
|
|
|
- for (Token<? extends TokenIdentifier> token : originalTokens) {
|
|
|
- currentUser.addToken(token);
|
|
|
- }
|
|
|
+ @VisibleForTesting
|
|
|
+ protected T createRMProxy(InetSocketAddress rmAddress) throws IOException {
|
|
|
+ return rmProxy.getProxy(conf, protocol, rmAddress);
|
|
|
}
|
|
|
|
|
|
private T getProxyInternal(boolean isFailover) {
|
|
|
SubClusterInfo subClusterInfo;
|
|
|
- UserGroupInformation currentUser = null;
|
|
|
+ // Use the existing proxy as a backup in case getting the new proxy fails.
|
|
|
+ // Note that if the first time it fails, the backup is also null. In that
|
|
|
+ // case we will hit NullPointerException and throw it back to AM.
|
|
|
+ T proxy = this.current;
|
|
|
try {
|
|
|
LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
|
|
|
subClusterId);
|
|
|
subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
|
|
|
// updating the conf with the refreshed RM addresses as proxy
|
|
|
- // creations
|
|
|
- // are based out of conf
|
|
|
+ // creations are based out of conf
|
|
|
updateRMAddress(subClusterInfo);
|
|
|
- currentUser = UserGroupInformation.getCurrentUser();
|
|
|
- addOriginalTokens(currentUser);
|
|
|
- } catch (YarnException e) {
|
|
|
+ if (this.originalUser == null) {
|
|
|
+ InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
|
|
|
+ LOG.info(
|
|
|
+ "Connecting to {} subClusterId {} with protocol {}"
|
|
|
+ + " without a proxy user",
|
|
|
+ rmAddress, subClusterId, protocol.getSimpleName());
|
|
|
+ proxy = createRMProxy(rmAddress);
|
|
|
+ } else {
|
|
|
+ // If the original ugi exists, always use that to create proxy because
|
|
|
+ // it contains up-to-date AMRMToken
|
|
|
+ proxy = this.originalUser.doAs(new PrivilegedExceptionAction<T>() {
|
|
|
+ @Override
|
|
|
+ public T run() throws IOException {
|
|
|
+ InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
|
|
|
+ LOG.info(
|
|
|
+ "Connecting to {} subClusterId {} with protocol {} as user {}",
|
|
|
+ rmAddress, subClusterId, protocol.getSimpleName(),
|
|
|
+ originalUser);
|
|
|
+ return createRMProxy(rmAddress);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
LOG.error("Exception while trying to create proxy to the ResourceManager"
|
|
|
+ " for SubClusterId: {}", subClusterId, e);
|
|
|
- return null;
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Could not get information of requester, ignoring for now.");
|
|
|
- }
|
|
|
- try {
|
|
|
- final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
|
|
|
- LOG.info("Connecting to {} with protocol {} as user: {}", rmAddress,
|
|
|
- protocol.getSimpleName(), currentUser);
|
|
|
- LOG.info("Failed over to the RM at {} for SubClusterId: {}", rmAddress,
|
|
|
- subClusterId);
|
|
|
- return rmProxy.getProxy(conf, protocol, rmAddress);
|
|
|
- } catch (IOException ioe) {
|
|
|
- LOG.error(
|
|
|
- "IOException while trying to create proxy to the ResourceManager"
|
|
|
- + " for SubClusterId: {}",
|
|
|
- subClusterId, ioe);
|
|
|
- return null;
|
|
|
+ if (proxy == null) {
|
|
|
+ throw new YarnRuntimeException(
|
|
|
+ String.format("Create initial proxy to the ResourceManager for"
|
|
|
+ + " SubClusterId %s failed", subClusterId),
|
|
|
+ e);
|
|
|
+ }
|
|
|
}
|
|
|
+ return proxy;
|
|
|
}
|
|
|
|
|
|
private void updateRMAddress(SubClusterInfo subClusterInfo) {
|
|
@@ -177,8 +184,11 @@ public class FederationRMFailoverProxyProvider<T>
|
|
|
|
|
|
@Override
|
|
|
public synchronized void performFailover(T currentProxy) {
|
|
|
- closeInternal(currentProxy);
|
|
|
+ // It will not return null proxy here
|
|
|
current = getProxyInternal(federationFailoverEnabled);
|
|
|
+ if (current != currentProxy) {
|
|
|
+ closeInternal(currentProxy);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|