|
@@ -64,7 +64,8 @@ public class FederationRMFailoverProxyProvider<T>
|
|
|
private FederationStateStoreFacade facade;
|
|
|
private SubClusterId subClusterId;
|
|
|
private UserGroupInformation originalUser;
|
|
|
- private boolean federationFailoverEnabled = false;
|
|
|
+ private boolean federationFailoverEnabled;
|
|
|
+ private boolean flushFacadeCacheForYarnRMAddr;
|
|
|
|
|
|
@Override
|
|
|
public void init(Configuration configuration, RMProxy<T> proxy,
|
|
@@ -75,13 +76,16 @@ public class FederationRMFailoverProxyProvider<T>
|
|
|
String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID);
|
|
|
Preconditions.checkNotNull(clusterId, "Missing RM ClusterId");
|
|
|
this.subClusterId = SubClusterId.newInstance(clusterId);
|
|
|
- this.facade = facade.getInstance();
|
|
|
+ this.facade = FederationStateStoreFacade.getInstance();
|
|
|
if (configuration instanceof YarnConfiguration) {
|
|
|
this.conf = (YarnConfiguration) configuration;
|
|
|
}
|
|
|
federationFailoverEnabled =
|
|
|
conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
|
|
|
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
|
|
|
+ flushFacadeCacheForYarnRMAddr =
|
|
|
+ conf.getBoolean(YarnConfiguration.FEDERATION_FLUSh_CACHE_FOR_RM_ADDR,
|
|
|
+ YarnConfiguration.DEFAULT_FEDERATION_FLUSh_CACHE_FOR_RM_ADDR);
|
|
|
|
|
|
conf.setInt(
|
|
|
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
|
|
@@ -119,7 +123,8 @@ public class FederationRMFailoverProxyProvider<T>
|
|
|
try {
|
|
|
LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
|
|
|
subClusterId);
|
|
|
- subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
|
|
|
+ subClusterInfo = facade.getSubCluster(subClusterId,
|
|
|
+ this.flushFacadeCacheForYarnRMAddr && isFailover);
|
|
|
// updating the conf with the refreshed RM addresses as proxy
|
|
|
// creations are based out of conf
|
|
|
updateRMAddress(subClusterInfo);
|