|
@@ -0,0 +1,266 @@
|
|
|
|
+/**
|
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
+ * contributor license agreements. See the NOTICE file distributed with this
|
|
|
|
+ * work for additional information regarding copyright ownership. The ASF
|
|
|
|
+ * licenses this file to you under the Apache License, Version 2.0 (the
|
|
|
|
+ * "License"); you may not use this file except in compliance with the License.
|
|
|
|
+ * You may obtain a copy of the License at
|
|
|
|
+ *
|
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
+ *
|
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
+ * License for the specific language governing permissions and limitations under
|
|
|
|
+ * the License.
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+package org.apache.hadoop.yarn.server.federation.policies;
|
|
|
|
+
|
|
|
|
+import java.nio.ByteBuffer;
|
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
+
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
|
|
|
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
|
|
|
+
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * This class provides a facade to the policy subsystem, and handles the
|
|
|
|
+ * lifecycle of policies (e.g., refresh from remote, default behaviors etc.).
|
|
|
|
+ */
|
|
|
|
+public class RouterPolicyFacade {
|
|
|
|
+
|
|
|
|
+ private static final Log LOG =
|
|
|
|
+ LogFactory.getLog(LocalityMulticastAMRMProxyPolicy.class);
|
|
|
|
+
|
|
|
|
+ private final SubClusterResolver subClusterResolver;
|
|
|
|
+ private final FederationStateStoreFacade federationFacade;
|
|
|
|
+ private Map<String, SubClusterPolicyConfiguration> globalConfMap;
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ Map<String, FederationRouterPolicy> globalPolicyMap;
|
|
|
|
+
|
|
|
|
+ public RouterPolicyFacade(YarnConfiguration conf,
|
|
|
|
+ FederationStateStoreFacade facade, SubClusterResolver resolver,
|
|
|
|
+ SubClusterId homeSubcluster)
|
|
|
|
+ throws FederationPolicyInitializationException {
|
|
|
|
+
|
|
|
|
+ this.federationFacade = facade;
|
|
|
|
+ this.subClusterResolver = resolver;
|
|
|
|
+ this.globalConfMap = new ConcurrentHashMap<>();
|
|
|
|
+ this.globalPolicyMap = new ConcurrentHashMap<>();
|
|
|
|
+
|
|
|
|
+ // load default behavior from store if possible
|
|
|
|
+ String defaulKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
|
|
|
|
+ SubClusterPolicyConfiguration configuration = null;
|
|
|
|
+ try {
|
|
|
|
+ configuration = federationFacade.getPolicyConfiguration(defaulKey);
|
|
|
|
+ } catch (YarnException e) {
|
|
|
|
+ LOG.warn("No fallback behavior defined in store, defaulting to XML "
|
|
|
|
+ + "configuration fallback behavior.");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // or from XML conf otherwise.
|
|
|
|
+ if (configuration == null) {
|
|
|
|
+ String defaultFederationPolicyManager =
|
|
|
|
+ conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER,
|
|
|
|
+ YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
|
|
|
|
+ String defaultPolicyParamString =
|
|
|
|
+ conf.get(YarnConfiguration.FEDERATION_POLICY_MANAGER_PARAMS,
|
|
|
|
+ YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
|
|
|
|
+ ByteBuffer defaultPolicyParam = ByteBuffer
|
|
|
|
+ .wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8));
|
|
|
|
+
|
|
|
|
+ configuration = SubClusterPolicyConfiguration.newInstance(defaulKey,
|
|
|
|
+ defaultFederationPolicyManager, defaultPolicyParam);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // construct the required policy manager
|
|
|
|
+ FederationPolicyInitializationContext fallbackContext =
|
|
|
|
+ new FederationPolicyInitializationContext(configuration,
|
|
|
|
+ subClusterResolver, federationFacade, homeSubcluster);
|
|
|
|
+ FederationPolicyManager fallbackPolicyManager =
|
|
|
|
+ instantiatePolicyManager(configuration.getType());
|
|
|
|
+ fallbackPolicyManager.setQueue(defaulKey);
|
|
|
|
+
|
|
|
|
+ // add to the cache the fallback behavior
|
|
|
|
+ globalConfMap.put(defaulKey,
|
|
|
|
+ fallbackContext.getSubClusterPolicyConfiguration());
|
|
|
|
+ globalPolicyMap.put(defaulKey,
|
|
|
|
+ fallbackPolicyManager.getRouterPolicy(fallbackContext, null));
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * This method provides a wrapper of all policy functionalities for routing .
|
|
|
|
+ * Internally it manages configuration changes, and policy init/reinit.
|
|
|
|
+ *
|
|
|
|
+ * @param appSubmissionContext the application to route.
|
|
|
|
+ *
|
|
|
|
+ * @return the id of the subcluster that will be the "home" for this
|
|
|
|
+ * application.
|
|
|
|
+ *
|
|
|
|
+ * @throws YarnException if there are issues initializing policies, or no
|
|
|
|
+ * valid sub-cluster id could be found for this app.
|
|
|
|
+ */
|
|
|
|
+ public SubClusterId getHomeSubcluster(
|
|
|
|
+ ApplicationSubmissionContext appSubmissionContext) throws YarnException {
|
|
|
|
+
|
|
|
|
+ // the maps are concurrent, but we need to protect from reset()
|
|
|
|
+ // reinitialization mid-execution by creating a new reference local to this
|
|
|
|
+ // method.
|
|
|
|
+ Map<String, SubClusterPolicyConfiguration> cachedConfs = globalConfMap;
|
|
|
|
+ Map<String, FederationRouterPolicy> policyMap = globalPolicyMap;
|
|
|
|
+
|
|
|
|
+ if (appSubmissionContext == null) {
|
|
|
|
+ throw new FederationPolicyException(
|
|
|
|
+ "The ApplicationSubmissionContext " + "cannot be null.");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ String queue = appSubmissionContext.getQueue();
|
|
|
|
+
|
|
|
|
+ // respecting YARN behavior we assume default queue if the queue is not
|
|
|
|
+ // specified. This also ensures that "null" can be used as a key to get the
|
|
|
|
+ // default behavior.
|
|
|
|
+ if (queue == null) {
|
|
|
|
+ queue = YarnConfiguration.DEFAULT_QUEUE_NAME;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // the facade might cache this request, based on its parameterization
|
|
|
|
+ SubClusterPolicyConfiguration configuration = null;
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ configuration = federationFacade.getPolicyConfiguration(queue);
|
|
|
|
+ } catch (YarnException e) {
|
|
|
|
+ LOG.debug(e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // If there is no policy configured for this queue, fallback to the baseline
|
|
|
|
+ // policy that is configured either in the store or via XML config (and
|
|
|
|
+ // cached)
|
|
|
|
+ if (configuration == null) {
|
|
|
|
+ try {
|
|
|
|
+ LOG.warn("There is no policies configured for queue: " + queue + " we"
|
|
|
|
+ + " fallback to default policy for: "
|
|
|
|
+ + YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
|
|
|
|
+
|
|
|
|
+ queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
|
|
|
|
+ configuration = federationFacade.getPolicyConfiguration(
|
|
|
|
+ YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
|
|
|
|
+ } catch (YarnException e) {
|
|
|
|
+ // the fallback is not configure via store, but via XML, using
|
|
|
|
+ // previously loaded configuration.
|
|
|
|
+ configuration =
|
|
|
|
+ cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // if the configuration has changed since last loaded, reinit the policy
|
|
|
|
+ // based on current configuration
|
|
|
|
+ if (!cachedConfs.containsKey(queue)
|
|
|
|
+ || !cachedConfs.get(queue).equals(configuration)) {
|
|
|
|
+ singlePolicyReinit(policyMap, cachedConfs, queue, configuration);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ FederationRouterPolicy policy = policyMap.get(queue);
|
|
|
|
+ if (policy == null) {
|
|
|
|
+ // this should never happen, as the to maps are updated together
|
|
|
|
+ throw new FederationPolicyException("No FederationRouterPolicy found "
|
|
|
|
+ + "for queue: " + appSubmissionContext.getQueue() + " (for "
|
|
|
|
+ + "application: " + appSubmissionContext.getApplicationId() + ") "
|
|
|
|
+ + "and no default specified.");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return policy.getHomeSubcluster(appSubmissionContext);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * This method reinitializes a policy and loads it in the policyMap.
|
|
|
|
+ *
|
|
|
|
+ * @param queue the queue to initialize a policy for.
|
|
|
|
+ * @param conf the configuration to use for initalization.
|
|
|
|
+ *
|
|
|
|
+ * @throws FederationPolicyInitializationException if initialization fails.
|
|
|
|
+ */
|
|
|
|
+ private void singlePolicyReinit(Map<String, FederationRouterPolicy> policyMap,
|
|
|
|
+ Map<String, SubClusterPolicyConfiguration> cachedConfs, String queue,
|
|
|
|
+ SubClusterPolicyConfiguration conf)
|
|
|
|
+ throws FederationPolicyInitializationException {
|
|
|
|
+
|
|
|
|
+ FederationPolicyInitializationContext context =
|
|
|
|
+ new FederationPolicyInitializationContext(conf, subClusterResolver,
|
|
|
|
+ federationFacade, null);
|
|
|
|
+ String newType = context.getSubClusterPolicyConfiguration().getType();
|
|
|
|
+ FederationRouterPolicy routerPolicy = policyMap.get(queue);
|
|
|
|
+
|
|
|
|
+ FederationPolicyManager federationPolicyManager =
|
|
|
|
+ instantiatePolicyManager(newType);
|
|
|
|
+ // set queue, reinit policy if required (implementation lazily check
|
|
|
|
+ // content of conf), and cache it
|
|
|
|
+ federationPolicyManager.setQueue(queue);
|
|
|
|
+ routerPolicy =
|
|
|
|
+ federationPolicyManager.getRouterPolicy(context, routerPolicy);
|
|
|
|
+
|
|
|
|
+ // we need the two put to be atomic (across multiple threads invoking
|
|
|
|
+ // this and reset operations)
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ policyMap.put(queue, routerPolicy);
|
|
|
|
+ cachedConfs.put(queue, conf);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static FederationPolicyManager instantiatePolicyManager(
|
|
|
|
+ String newType) throws FederationPolicyInitializationException {
|
|
|
|
+ FederationPolicyManager federationPolicyManager = null;
|
|
|
|
+ try {
|
|
|
|
+ // create policy instance and set queue
|
|
|
|
+ Class c = Class.forName(newType);
|
|
|
|
+ federationPolicyManager = (FederationPolicyManager) c.newInstance();
|
|
|
|
+ } catch (ClassNotFoundException e) {
|
|
|
|
+ throw new FederationPolicyInitializationException(e);
|
|
|
|
+ } catch (InstantiationException e) {
|
|
|
|
+ throw new FederationPolicyInitializationException(e);
|
|
|
|
+ } catch (IllegalAccessException e) {
|
|
|
|
+ throw new FederationPolicyInitializationException(e);
|
|
|
|
+ }
|
|
|
|
+ return federationPolicyManager;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * This method flushes all cached configurations and policies. This should be
|
|
|
|
+ * invoked if the facade remains activity after very large churn of queues in
|
|
|
|
+ * the system.
|
|
|
|
+ */
|
|
|
|
+ public synchronized void reset() {
|
|
|
|
+
|
|
|
|
+ // remember the fallBack
|
|
|
|
+ SubClusterPolicyConfiguration conf =
|
|
|
|
+ globalConfMap.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
|
|
|
|
+ FederationRouterPolicy policy =
|
|
|
|
+ globalPolicyMap.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
|
|
|
|
+
|
|
|
|
+ globalConfMap = new ConcurrentHashMap<>();
|
|
|
|
+ globalPolicyMap = new ConcurrentHashMap<>();
|
|
|
|
+
|
|
|
|
+ // add to the cache a fallback with keyword null
|
|
|
|
+ globalConfMap.put(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY, conf);
|
|
|
|
+ globalPolicyMap.put(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY,
|
|
|
|
+ policy);
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+}
|