|
@@ -23,13 +23,10 @@ import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
|
|
|
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
|
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
|
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
|
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
|
|
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
|
|
@@ -38,6 +35,8 @@ import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
@@ -47,8 +46,8 @@ import com.google.common.annotations.VisibleForTesting;
|
|
*/
|
|
*/
|
|
public class RouterPolicyFacade {
|
|
public class RouterPolicyFacade {
|
|
|
|
|
|
- private static final Log LOG =
|
|
|
|
- LogFactory.getLog(LocalityMulticastAMRMProxyPolicy.class);
|
|
|
|
|
|
+ private static final Logger LOG =
|
|
|
|
+ LoggerFactory.getLogger(RouterPolicyFacade.class);
|
|
|
|
|
|
private final SubClusterResolver subClusterResolver;
|
|
private final SubClusterResolver subClusterResolver;
|
|
private final FederationStateStoreFacade federationFacade;
|
|
private final FederationStateStoreFacade federationFacade;
|
|
@@ -68,10 +67,10 @@ public class RouterPolicyFacade {
|
|
this.globalPolicyMap = new ConcurrentHashMap<>();
|
|
this.globalPolicyMap = new ConcurrentHashMap<>();
|
|
|
|
|
|
// load default behavior from store if possible
|
|
// load default behavior from store if possible
|
|
- String defaulKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
|
|
|
|
|
|
+ String defaultKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
|
|
SubClusterPolicyConfiguration configuration = null;
|
|
SubClusterPolicyConfiguration configuration = null;
|
|
try {
|
|
try {
|
|
- configuration = federationFacade.getPolicyConfiguration(defaulKey);
|
|
|
|
|
|
+ configuration = federationFacade.getPolicyConfiguration(defaultKey);
|
|
} catch (YarnException e) {
|
|
} catch (YarnException e) {
|
|
LOG.warn("No fallback behavior defined in store, defaulting to XML "
|
|
LOG.warn("No fallback behavior defined in store, defaulting to XML "
|
|
+ "configuration fallback behavior.");
|
|
+ "configuration fallback behavior.");
|
|
@@ -88,7 +87,7 @@ public class RouterPolicyFacade {
|
|
ByteBuffer defaultPolicyParam = ByteBuffer
|
|
ByteBuffer defaultPolicyParam = ByteBuffer
|
|
.wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8));
|
|
.wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8));
|
|
|
|
|
|
- configuration = SubClusterPolicyConfiguration.newInstance(defaulKey,
|
|
|
|
|
|
+ configuration = SubClusterPolicyConfiguration.newInstance(defaultKey,
|
|
defaultFederationPolicyManager, defaultPolicyParam);
|
|
defaultFederationPolicyManager, defaultPolicyParam);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -98,12 +97,12 @@ public class RouterPolicyFacade {
|
|
subClusterResolver, federationFacade, homeSubcluster);
|
|
subClusterResolver, federationFacade, homeSubcluster);
|
|
FederationPolicyManager fallbackPolicyManager =
|
|
FederationPolicyManager fallbackPolicyManager =
|
|
FederationPolicyUtils.instantiatePolicyManager(configuration.getType());
|
|
FederationPolicyUtils.instantiatePolicyManager(configuration.getType());
|
|
- fallbackPolicyManager.setQueue(defaulKey);
|
|
|
|
|
|
+ fallbackPolicyManager.setQueue(defaultKey);
|
|
|
|
|
|
// add to the cache the fallback behavior
|
|
// add to the cache the fallback behavior
|
|
- globalConfMap.put(defaulKey,
|
|
|
|
|
|
+ globalConfMap.put(defaultKey,
|
|
fallbackContext.getSubClusterPolicyConfiguration());
|
|
fallbackContext.getSubClusterPolicyConfiguration());
|
|
- globalPolicyMap.put(defaulKey,
|
|
|
|
|
|
+ globalPolicyMap.put(defaultKey,
|
|
fallbackPolicyManager.getRouterPolicy(fallbackContext, null));
|
|
fallbackPolicyManager.getRouterPolicy(fallbackContext, null));
|
|
|
|
|
|
}
|
|
}
|
|
@@ -155,29 +154,37 @@ public class RouterPolicyFacade {
|
|
try {
|
|
try {
|
|
configuration = federationFacade.getPolicyConfiguration(queue);
|
|
configuration = federationFacade.getPolicyConfiguration(queue);
|
|
} catch (YarnException e) {
|
|
} catch (YarnException e) {
|
|
- LOG.debug(e);
|
|
|
|
|
|
+ String errMsg = "There is no policy configured for the queue: " + queue
|
|
|
|
+ + ", falling back to defaults.";
|
|
|
|
+ LOG.warn(errMsg, e);
|
|
}
|
|
}
|
|
|
|
|
|
// If there is no policy configured for this queue, fallback to the baseline
|
|
// If there is no policy configured for this queue, fallback to the baseline
|
|
// policy that is configured either in the store or via XML config (and
|
|
// policy that is configured either in the store or via XML config (and
|
|
// cached)
|
|
// cached)
|
|
if (configuration == null) {
|
|
if (configuration == null) {
|
|
- try {
|
|
|
|
- LOG.warn("There is no policies configured for queue: " + queue + " we"
|
|
|
|
- + " fallback to default policy for: "
|
|
|
|
- + YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
|
|
|
|
|
|
+ LOG.warn("There is no policies configured for queue: " + queue + " we"
|
|
|
|
+ + " fallback to default policy for: "
|
|
|
|
+ + YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
|
|
|
|
|
|
- queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
|
|
|
|
- configuration = federationFacade.getPolicyConfiguration(
|
|
|
|
- YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
|
|
|
|
|
|
+ queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
|
|
|
|
+ try {
|
|
|
|
+ configuration = federationFacade.getPolicyConfiguration(queue);
|
|
} catch (YarnException e) {
|
|
} catch (YarnException e) {
|
|
- // the fallback is not configure via store, but via XML, using
|
|
|
|
- // previously loaded configuration.
|
|
|
|
- configuration =
|
|
|
|
- cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
|
|
|
|
|
|
+ String errMsg = "Cannot retrieve policy configured for the queue: "
|
|
|
|
+ + queue + ", falling back to defaults.";
|
|
|
|
+ LOG.warn(errMsg, e);
|
|
|
|
+
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // the fallback is not configure via store, but via XML, using
|
|
|
|
+ // previously loaded configuration.
|
|
|
|
+ if (configuration == null) {
|
|
|
|
+ configuration =
|
|
|
|
+ cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
|
|
|
|
+ }
|
|
|
|
+
|
|
// if the configuration has changed since last loaded, reinit the policy
|
|
// if the configuration has changed since last loaded, reinit the policy
|
|
// based on current configuration
|
|
// based on current configuration
|
|
if (!cachedConfs.containsKey(queue)
|
|
if (!cachedConfs.containsKey(queue)
|