Przeglądaj źródła

YARN-11037. Add configurable logic to split resource request to the least loaded SC. (#5515)

slfan1989 2 lat temu
rodzic
commit
ea87aa2f5b
13 zmienionych plików z 623 dodań i 21 usunięć
  1. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/EnhancedHeadroom.java
  2. 39 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 80 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  4. 49 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
  5. 69 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/ContainerAllocationHistory.java
  6. 218 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
  7. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSet.java
  8. 48 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java
  9. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
  10. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java
  11. 96 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java
  12. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java
  13. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/EnhancedHeadroom.java

@@ -69,4 +69,9 @@ public abstract class EnhancedHeadroom {
     sb.append(">");
     return sb.toString();
   }
+
+  public double getNormalizedPendingCount(long multiplier) {
+    int totalPendingCount = getTotalPendingCount();
+    return (double) totalPendingCount * multiplier;
+  }
 }

+ 39 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -4058,6 +4058,45 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT =
       60000; // one minute
 
+  // Prefix for configs related to selecting SC based on load
+  public static final String LOAD_BASED_SC_SELECTOR_PREFIX =
+      NM_PREFIX + "least-load-policy-selector.";
+
+  // Config to enable re-rerouting node requests base on SC load
+  public static final String LOAD_BASED_SC_SELECTOR_ENABLED =
+      LOAD_BASED_SC_SELECTOR_PREFIX + "enabled";
+  public static final boolean DEFAULT_LOAD_BASED_SC_SELECTOR_ENABLED = false;
+
+  // Pending container threshold for selecting SC
+  public static final String LOAD_BASED_SC_SELECTOR_THRESHOLD =
+      LOAD_BASED_SC_SELECTOR_PREFIX + "pending-container.threshold";
+  public static final int DEFAULT_LOAD_BASED_SC_SELECTOR_THRESHOLD = 10000;
+
+  // Whether to consider total number of active cores in the subcluster for load
+  public static final String LOAD_BASED_SC_SELECTOR_USE_ACTIVE_CORE =
+      LOAD_BASED_SC_SELECTOR_PREFIX + "use-active-core";
+  public static final boolean DEFAULT_LOAD_BASED_SC_SELECTOR_USE_ACTIVE_CORE = false;
+
+  // multiplier to normalize pending container to active cores
+  public static final String LOAD_BASED_SC_SELECTOR_MULTIPLIER =
+      LOAD_BASED_SC_SELECTOR_PREFIX + "multiplier";
+  public static final int DEFAULT_LOAD_BASED_SC_SELECTOR_MULTIPLIER = 50000;
+
+  // max count to maintain for container allocation history
+  public static final String FEDERATION_ALLOCATION_HISTORY_MAX_ENTRY =
+      FEDERATION_PREFIX + "amrmproxy.allocation.history.max.entry";
+  public static final int DEFAULT_FEDERATION_ALLOCATION_HISTORY_MAX_ENTRY = 100;
+
+  // Whether to fail directly if activeSubCluster is less than 1.
+  public static final String LOAD_BASED_SC_SELECTOR_FAIL_ON_ERROR =
+      LOAD_BASED_SC_SELECTOR_PREFIX + "fail-on-error";
+  public static final boolean DEFAULT_LOAD_BASED_SC_SELECTOR_FAIL_ON_ERROR = true;
+
+  // Blacklisted subClusters.
+  public static final String FEDERATION_BLACKLIST_SUBCLUSTERS =
+      LOAD_BASED_SC_SELECTOR_PREFIX + "blacklist-subclusters";
+  public static final String DEFAULT_FEDERATION_BLACKLIST_SUBCLUSTERS = "";
+
   // AMRMProxy Register UAM Retry-Num
   public static final String FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT =
       FEDERATION_PREFIX + "amrmproxy.register.uam.retry-count";

+ 80 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -5558,4 +5558,84 @@
     <value>0.0.0.0:8070</value>
   </property>
 
+  <property>
+    <description>
+      This configuration will enable request rerouting according to the load of the subCluster.
+      If it is true, it will reroute the request according to the load of the subCluster.
+      The default configuration is false.
+    </description>
+    <name>yarn.nodemanager.least-load-policy-selector.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+      SubCluster pending container threshold. The default value is 10000.
+      This configuration will determine the load weight of a subCluster.
+      For SC with pending containers count bigger than container threshold / 2,
+      use threshold / pending as weight.
+      For SC with pending containers count less than threshold / 2, we cap the weight at 2.
+    </description>
+    <name>yarn.nodemanager.least-load-policy-selector.pending-container.threshold</name>
+    <value>10000</value>
+  </property>
+
+  <property>
+    <description>
+      Whether to consider the configured vcores when calculating the subCluster load.
+      The default value is false, we only consider the number of cluster pending containers.
+      If this configuration item is set to true, This configuration item needs to be used together
+      with yarn.nodemanager.least-load-policy-selector.multiplier. We will use the following formula
+      when calculating subCluster pending.
+      pendingContainersCountNormalize = (totalPendingContainersCount * multiplier) / totalActiveCores.
+    </description>
+    <name>yarn.nodemanager.least-load-policy-selector.use-active-core</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+      Max count to maintain for container allocation history.
+    </description>
+    <name>yarn.federation.amrmproxy.allocation.history.max.entry</name>
+    <value>100</value>
+  </property>
+
+  <property>
+    <description>
+      Whether to fail directly if activeSubCluster is less than 1.
+      The default is true.
+      If We set to false, We will try to re-fetch activeSubCluster list.
+    </description>
+    <name>yarn.nodemanager.least-load-policy-selector.fail-on-error</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <description>
+      The subCluster configured in the blacklist will not be forwarded requests.
+      The default value is empty.
+    </description>
+    <name>yarn.nodemanager.least-load-policy-selector.blacklist-subclusters</name>
+    <value></value>
+  </property>
+
+  <property>
+    <description>
+      Max count to maintain for container allocation history.
+    </description>
+    <name>yarn.federation.amrmproxy.allocation.history.max.entry</name>
+    <value>100</value>
+  </property>
+
+  <property>
+    <description>
+      This configuration will be used
+      when yarn.nodemanager.least-load-policy-selector.use-active-core is set to true.
+      The purpose of this value is to help normalize the pendingContainersCount.
+    </description>
+    <name>yarn.nodemanager.least-load-policy-selector.multiplier</name>
+    <value>50000</value>
+  </property>
+
 </configuration>

+ 49 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java

@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -132,16 +133,23 @@ public class AMRMClientRelayer implements ApplicationMasterProtocol {
 
   private AMRMClientRelayerMetrics metrics;
 
+  private ContainerAllocationHistory allocationHistory;
+
   public AMRMClientRelayer(ApplicationMasterProtocol rmClient,
       ApplicationId appId, String rmId) {
     this.resetResponseId = -1;
     this.metrics = AMRMClientRelayerMetrics.getInstance();
-    this.rmId = "";
     this.rmClient = rmClient;
     this.appId = appId;
     this.rmId = rmId;
   }
 
+  public AMRMClientRelayer(ApplicationMasterProtocol rmClient,
+      ApplicationId appId, String rmId, Configuration conf) {
+    this(rmClient, appId, rmId);
+    this.allocationHistory = new ContainerAllocationHistory(conf);
+  }
+
   public void setAMRegistrationRequest(
       RegisterApplicationMasterRequest registerRequest) {
     this.amRegistrationRequest = registerRequest;
@@ -444,6 +452,8 @@ public class AMRMClientRelayer implements ApplicationMasterProtocol {
         if (this.knownContainers.add(container.getId())) {
           this.metrics.addFulfilledQPS(this.rmId, AMRMClientRelayerMetrics
               .getRequestType(container.getExecutionType()), 1);
+          long currentTime = System.currentTimeMillis();
+          long fulfillLatency = -1;
           if (container.getAllocationRequestId() != 0) {
             Integer count = this.pendingCountForMetrics
                 .get(container.getAllocationRequestId());
@@ -453,13 +463,14 @@ public class AMRMClientRelayer implements ApplicationMasterProtocol {
               this.metrics.decrClientPending(this.rmId,
                   AMRMClientRelayerMetrics
                       .getRequestType(container.getExecutionType()), 1);
-              this.metrics.addFulfillLatency(this.rmId,
-                  AMRMClientRelayerMetrics
-                      .getRequestType(container.getExecutionType()),
-                  System.currentTimeMillis() - this.askTimeStamp
-                      .get(container.getAllocationRequestId()));
+              fulfillLatency = currentTime - this.askTimeStamp.get(
+                  container.getAllocationRequestId());
+              AMRMClientRelayerMetrics.RequestType requestType = AMRMClientRelayerMetrics
+                  .getRequestType(container.getExecutionType());
+              this.metrics.addFulfillLatency(this.rmId, requestType, fulfillLatency);
             }
           }
+          addAllocationHistoryEntry(container, currentTime, fulfillLatency);
         }
       }
     }
@@ -576,6 +587,38 @@ public class AMRMClientRelayer implements ApplicationMasterProtocol {
     this.ask.add(remoteRequest);
   }
 
+  public ContainerAllocationHistory getAllocationHistory() {
+    return this.allocationHistory;
+  }
+
+  private void addAllocationHistoryEntry(Container container, long fulfillTimeStamp,
+      long fulfillLatency) {
+    ResourceRequestSetKey key = ResourceRequestSetKey.extractMatchingKey(container,
+        this.remotePendingAsks.keySet());
+    if (key == null) {
+      LOG.info("allocation history ignoring {}, no matching request key found.", container);
+      return;
+    }
+    this.allocationHistory.addAllocationEntry(container, this.remotePendingAsks.get(key),
+        fulfillTimeStamp, fulfillLatency);
+  }
+
+  public void gatherReadOnlyPendingAsksInfo(Map<ResourceRequestSetKey,
+      ResourceRequestSet> pendingAsks, Map<ResourceRequestSetKey, Long> pendingTime) {
+    pendingAsks.clear();
+    pendingTime.clear();
+    synchronized (this) {
+      pendingAsks.putAll(this.remotePendingAsks);
+      for (ResourceRequestSetKey key : pendingAsks.keySet()) {
+        Long startTime = this.askTimeStamp.get(key.getAllocationRequestId());
+        if (startTime != null) {
+          long elapsedMs = System.currentTimeMillis() - startTime;
+          pendingTime.put(key, elapsedMs);
+        }
+      }
+    }
+  }
+
   @VisibleForTesting
   protected Map<ResourceRequestSetKey, ResourceRequestSet>
       getRemotePendingAsks() {

+ 69 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/ContainerAllocationHistory.java

@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server;
+
+import java.util.AbstractMap;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.Queue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Records the allocation history from YarnRM and provide aggregated insights.
+ */
+public class ContainerAllocationHistory {
+  private static final Logger LOG = LoggerFactory.getLogger(AMRMClientRelayer.class);
+
+  private int maxEntryCount;
+
+  // Allocate timing history <AllocateTimeStamp, AllocateLatency>
+  private Queue<Entry<Long, Long>> relaxableG = new LinkedList<>();
+
+  public ContainerAllocationHistory(Configuration conf) {
+    this.maxEntryCount = conf.getInt(
+        YarnConfiguration.FEDERATION_ALLOCATION_HISTORY_MAX_ENTRY,
+        YarnConfiguration.DEFAULT_FEDERATION_ALLOCATION_HISTORY_MAX_ENTRY);
+  }
+
+  /**
+   * Record the allocation history for the container.
+   *
+   * @param container to add record for
+   * @param requestSet resource request ask set
+   * @param fulfillTimeStamp time at which allocation happened
+   * @param fulfillLatency time elapsed in allocating since asked
+   */
+  public synchronized void addAllocationEntry(Container container,
+      ResourceRequestSet requestSet, long fulfillTimeStamp, long fulfillLatency){
+    if (!requestSet.isANYRelaxable()) {
+      LOG.info("allocation history ignoring {}, relax locality is false", container);
+      return;
+    }
+    this.relaxableG.add(new AbstractMap.SimpleEntry<>(
+        fulfillTimeStamp, fulfillLatency));
+    if (this.relaxableG.size() > this.maxEntryCount) {
+      this.relaxableG.remove();
+    }
+  }
+}

+ 218 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -31,6 +32,9 @@ import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.collections.MapUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -54,6 +58,19 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.util.Preconditions;
 
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOAD_BASED_SC_SELECTOR_ENABLED;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_LOAD_BASED_SC_SELECTOR_ENABLED;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOAD_BASED_SC_SELECTOR_THRESHOLD;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_LOAD_BASED_SC_SELECTOR_THRESHOLD;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOAD_BASED_SC_SELECTOR_USE_ACTIVE_CORE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_LOAD_BASED_SC_SELECTOR_USE_ACTIVE_CORE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOAD_BASED_SC_SELECTOR_MULTIPLIER;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_LOAD_BASED_SC_SELECTOR_MULTIPLIER;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOAD_BASED_SC_SELECTOR_FAIL_ON_ERROR;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_LOAD_BASED_SC_SELECTOR_FAIL_ON_ERROR;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_BLACKLIST_SUBCLUSTERS;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_FEDERATION_BLACKLIST_SUBCLUSTERS;
+
 /**
  * An implementation of the {@link FederationAMRMProxyPolicy} interface that
  * carefully multicasts the requests with the following behavior:
@@ -131,11 +148,44 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
   private Map<SubClusterId, Float> weights;
   private SubClusterResolver resolver;
 
+  private Configuration conf;
   private Map<SubClusterId, Resource> headroom;
   private Map<SubClusterId, EnhancedHeadroom> enhancedHeadroom;
   private float hrAlpha;
   private FederationStateStoreFacade federationFacade;
   private SubClusterId homeSubcluster;
+  private int printRRMax;
+  public static final String PRINT_RR_MAX =
+      "yarn.nodemanager.amrmproxy.address.splitmerge.printmaxrrcount";
+  public static final int DEFAULT_PRINT_RR_MAX = 1000;
+  private boolean failOnError = DEFAULT_LOAD_BASED_SC_SELECTOR_FAIL_ON_ERROR;
+
+  /**
+   * Print a list of Resource Requests into a one line string.
+   *
+   * @param response list of ResourceRequest
+   * @param max number of ResourceRequest to print
+   * @return the printed one line string
+   */
+  public static String prettyPrintRequests(List<ResourceRequest> response, int max) {
+    StringBuilder builder = new StringBuilder();
+    for (ResourceRequest rr : response) {
+      builder.append("[id:").append(rr.getAllocationRequestId())
+          .append(" loc:")
+          .append(rr.getResourceName())
+          .append(" num:")
+          .append(rr.getNumContainers())
+          .append(" pri:")
+          .append(((rr.getPriority() != null) ? rr.getPriority().getPriority() : -1))
+          .append("], ");
+      if (max != -1) {
+        if (max-- <= 0) {
+          break;
+        }
+      }
+    }
+    return builder.toString();
+  }
 
   @Override
   public void reinitialize(
@@ -182,6 +232,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
     weights = newWeightsConverted;
     resolver = policyContext.getFederationSubclusterResolver();
 
+    // Data structures that only need to initialize once
     if (headroom == null) {
       headroom = new ConcurrentHashMap<>();
       enhancedHeadroom = new ConcurrentHashMap<>();
@@ -191,6 +242,11 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
     this.federationFacade =
         policyContext.getFederationStateStoreFacade();
     this.homeSubcluster = policyContext.getHomeSubcluster();
+
+    this.conf = this.federationFacade.getConf();
+    this.printRRMax = this.conf.getInt(PRINT_RR_MAX, DEFAULT_PRINT_RR_MAX);
+    this.failOnError = this.conf.getBoolean(LOAD_BASED_SC_SELECTOR_FAIL_ON_ERROR,
+        DEFAULT_LOAD_BASED_SC_SELECTOR_FAIL_ON_ERROR);
   }
 
   @Override
@@ -217,10 +273,9 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
     // active subclusters. Create a new instance per call because this method
     // can be called concurrently.
     AllocationBookkeeper bookkeeper = new AllocationBookkeeper();
-    bookkeeper.reinitialize(getActiveSubclusters(), timedOutSubClusters);
+    bookkeeper.reinitialize(getActiveSubclusters(), timedOutSubClusters, conf);
 
-    List<ResourceRequest> nonLocalizedRequests =
-        new ArrayList<ResourceRequest>();
+    List<ResourceRequest> nonLocalizedRequests = new ArrayList<>();
 
     SubClusterId targetId = null;
     Set<SubClusterId> targetIds = null;
@@ -240,6 +295,17 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
       // Handle "node" requests
       try {
         targetId = resolver.getSubClusterForNode(rr.getResourceName());
+
+        // If needed, re-reroute node requests base on SC load
+        boolean loadBasedSCSelectorEnabled =
+            conf.getBoolean(LOAD_BASED_SC_SELECTOR_ENABLED, DEFAULT_LOAD_BASED_SC_SELECTOR_ENABLED);
+        if (loadBasedSCSelectorEnabled) {
+          int maxPendingThreshold = conf.getInt(LOAD_BASED_SC_SELECTOR_THRESHOLD,
+              DEFAULT_LOAD_BASED_SC_SELECTOR_THRESHOLD);
+          targetId = routeNodeRequestIfNeeded(targetId, maxPendingThreshold,
+              bookkeeper.getActiveAndEnabledSC());
+        }
+        LOG.debug("Node request {}", rr.getResourceName());
       } catch (YarnException e) {
         // this might happen as we can't differentiate node from rack names
         // we log altogether later
@@ -285,7 +351,16 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
     // handle all non-localized requests (ANY)
     splitAnyRequests(nonLocalizedRequests, bookkeeper);
 
-    return bookkeeper.getAnswer();
+    // Take the split result, feed into the askBalancer
+    Map<SubClusterId, List<ResourceRequest>> answer = bookkeeper.getAnswer();
+    LOG.info("Before split {} RRs: {}", resourceRequests.size(),
+        prettyPrintRequests(resourceRequests, this.printRRMax));
+
+    for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : bookkeeper.getAnswer().entrySet()) {
+      LOG.info("After split {} has {} RRs: {}", entry.getKey(), entry.getValue().size(),
+          prettyPrintRequests(entry.getValue(), this.printRRMax));
+    }
+    return answer;
   }
 
   /**
@@ -495,6 +570,120 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
     return headroomWeighting;
   }
 
+  /**
+   * When certain subcluster is too loaded, reroute Node requests going there.
+   *
+   * @param targetId current subClusterId where request is sent
+   * @param maxThreshold threshold for Pending count
+   * @param activeAndEnabledSCs list of active sc
+   * @return subClusterId target sc id
+   */
+  protected SubClusterId routeNodeRequestIfNeeded(SubClusterId targetId,
+      int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) {
+    // If targetId is not in the active and enabled SC list, reroute the traffic
+    if (activeAndEnabledSCs.contains(targetId)) {
+      int targetPendingCount = getSubClusterLoad(targetId);
+      if (targetPendingCount == -1 || targetPendingCount < maxThreshold) {
+        return targetId;
+      }
+    }
+    SubClusterId scId = chooseSubClusterIdForMaxLoadSC(targetId, maxThreshold, activeAndEnabledSCs);
+    return scId;
+  }
+
+  /**
+   * Check if the current target subcluster is over max load, and if it is
+   * reroute it.
+   *
+   * @param targetId            the original target subcluster id
+   * @param maxThreshold        the max load threshold to reroute
+   * @param activeAndEnabledSCs the list of active and enabled subclusters
+   * @return targetId if it is within maxThreshold, otherwise a new id
+   */
+  private SubClusterId chooseSubClusterIdForMaxLoadSC(SubClusterId targetId,
+      int maxThreshold, Set<SubClusterId> activeAndEnabledSCs) {
+    ArrayList<Float> weight = new ArrayList<>();
+    ArrayList<SubClusterId> scIds = new ArrayList<>();
+    int targetLoad = getSubClusterLoad(targetId);
+    if (targetLoad == -1 || !activeAndEnabledSCs.contains(targetId)) {
+      // Probably a SC that's not active and enabled. Forcing a reroute
+      targetLoad = Integer.MAX_VALUE;
+    }
+
+    /*
+     * Prepare the weight for a random draw among all known SCs.
+     *
+     * For SC with pending bigger than maxThreshold / 2, use maxThreshold /
+     * pending as weight. We multiplied by maxThreshold so that the weight
+     * won't be too small in value.
+     *
+     * For SC with pending less than maxThreshold / 2, we cap the weight at 2
+     * = (maxThreshold / (maxThreshold / 2)) so that SC with small pending
+     * will not get a huge weight and thus get swamped.
+     */
+    for (SubClusterId sc : activeAndEnabledSCs) {
+      int scLoad = getSubClusterLoad(sc);
+      if (scLoad > targetLoad) {
+        // Never mind if it is not the most loaded SC
+        return targetId;
+      }
+      if (scLoad <= maxThreshold / 2) {
+        weight.add(2f);
+      } else {
+        weight.add((float) maxThreshold / scLoad);
+      }
+      scIds.add(sc);
+    }
+    if (weights.size() == 0) {
+      return targetId;
+    }
+    return scIds.get(FederationPolicyUtils.getWeightedRandom(weight));
+  }
+
+  /**
+   * get the Load data of the subCluster.
+   *
+   * @param subClusterId subClusterId.
+   * @return The number of pending containers for the subCluster.
+   */
+  private int getSubClusterLoad(SubClusterId subClusterId) {
+    EnhancedHeadroom headroomData = this.enhancedHeadroom.get(subClusterId);
+    if (headroomData == null) {
+      return -1;
+    }
+
+    // Use new data from enhanced headroom
+    boolean useActiveCoreEnabled = conf.getBoolean(LOAD_BASED_SC_SELECTOR_USE_ACTIVE_CORE,
+        DEFAULT_LOAD_BASED_SC_SELECTOR_USE_ACTIVE_CORE);
+
+    // If we consider the number of vCores in the subCluster
+    if (useActiveCoreEnabled) {
+
+      // If the vcore of the subCluster is less than or equal to 0,
+      // it means that containers cannot be scheduled to this subCluster,
+      // and we will return a very large number, indicating that the subCluster is unavailable.
+      if (headroomData.getTotalActiveCores() <= 0) {
+        return Integer.MAX_VALUE;
+      }
+
+      // Multiply by a constant factor, to ensure the numerator > denominator.
+      // We will normalize the PendingCount, using PendingCount * multiplier / TotalActiveCores.
+      long multiplier = conf.getLong(LOAD_BASED_SC_SELECTOR_MULTIPLIER,
+          DEFAULT_LOAD_BASED_SC_SELECTOR_MULTIPLIER);
+      double value =
+          headroomData.getNormalizedPendingCount(multiplier) / headroomData.getTotalActiveCores();
+      if (value > Integer.MAX_VALUE) {
+        return Integer.MAX_VALUE;
+      } else {
+        return (int) value;
+      }
+    } else {
+      // If the number of vcores in the subCluster is not considered,
+      // we directly return the number of pending containers in the subCluster.
+      return headroomData.getTotalPendingCount();
+    }
+  }
+
   /**
    * This helper class is used to book-keep the requests made to each
    * subcluster, and maintain useful statistics to split ANY requests.
@@ -523,8 +712,9 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
 
     private void reinitialize(
         Map<SubClusterId, SubClusterInfo> activeSubclusters,
-        Set<SubClusterId> timedOutSubClusters) throws YarnException {
-      if (activeSubclusters == null) {
+        Set<SubClusterId> timedOutSubClusters, Configuration pConf) throws YarnException {
+
+      if (MapUtils.isEmpty(activeSubclusters)) {
         throw new YarnRuntimeException("null activeSubclusters received");
       }
 
@@ -548,10 +738,28 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
         }
       }
 
+      // subCluster blacklisting from configuration
+      String blacklistedSubClusters = pConf.get(FEDERATION_BLACKLIST_SUBCLUSTERS,
+          DEFAULT_FEDERATION_BLACKLIST_SUBCLUSTERS);
+      if (blacklistedSubClusters != null) {
+        Collection<String> tempList = StringUtils.getStringCollection(blacklistedSubClusters);
+        for (String item : tempList) {
+          activeAndEnabledSC.remove(SubClusterId.newInstance(item.trim()));
+        }
+      }
+
       if (activeAndEnabledSC.size() < 1) {
-        throw new NoActiveSubclustersException(
-            "None of the subclusters enabled in this policy (weight>0) are "
-                + "currently active we cannot forward the ResourceRequest(s)");
+        String errorMsg = "None of the subClusters enabled in this Policy (weight > 0) are "
+            + "currently active we cannot forward the ResourceRequest(s)";
+        if (failOnError) {
+          throw new NoActiveSubclustersException(errorMsg);
+        } else {
+          LOG.error(errorMsg + ", continuing by enabling all active subClusters.");
+          activeAndEnabledSC.addAll(activeSubclusters.keySet());
+          for (SubClusterId sc : activeSubclusters.keySet()) {
+            policyWeights.put(sc, 1.0f);
+          }
+        }
       }
 
       Set<SubClusterId> tmpSCSet = new HashSet<>(activeAndEnabledSC);
@@ -559,7 +767,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
 
       if (tmpSCSet.size() < 1) {
         LOG.warn("All active and enabled subclusters have expired last "
-            + "heartbeat time. Ignore the expiry check for this request");
+            + "heartbeat time. Ignore the expiry check for this request.");
       } else {
         activeAndEnabledSC = tmpSCSet;
       }

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSet.java

@@ -36,6 +36,8 @@ public class ResourceRequestSet {
 
   private ResourceRequestSetKey key;
   private int numContainers;
+  // Whether the ANY RR is relaxable
+  private boolean relaxable;
   // ResourceName -> RR
   private Map<String, ResourceRequest> asks;
 
@@ -49,6 +51,7 @@ public class ResourceRequestSet {
     this.key = key;
     // leave it zero for now, as if it is a cancel
     this.numContainers = 0;
+    this.relaxable = true;
     this.asks = new HashMap<>();
   }
 
@@ -61,6 +64,7 @@ public class ResourceRequestSet {
     this.key = other.key;
     this.numContainers = other.numContainers;
     this.asks = new HashMap<>();
+    this.relaxable = other.relaxable;
     // The assumption is that the RR objects should not be modified without
     // making a copy
     this.asks.putAll(other.asks);
@@ -86,6 +90,7 @@ public class ResourceRequestSet {
       // For G requestSet, update the numContainers only for ANY RR
       if (ask.getResourceName().equals(ResourceRequest.ANY)) {
         this.numContainers = ask.getNumContainers();
+        this.relaxable = ask.getRelaxLocality();
       }
     } else {
       // The assumption we made about O asks is that all RR in a requestSet has
@@ -182,6 +187,15 @@ public class ResourceRequestSet {
     }
   }
 
+  /**
+   * Whether the request set is relaxable at ANY level.
+   *
+   * @return whether the request set is relaxable at ANY level
+   */
+  public boolean isANYRelaxable() {
+    return this.relaxable;
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();

+ 48 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/ResourceRequestSetKey.java

@@ -18,11 +18,16 @@
 
 package org.apache.hadoop.yarn.server.scheduler;
 
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
 
 /**
  * The scheduler key for a group of {@link ResourceRequest}.
@@ -32,6 +37,9 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
  */
 public class ResourceRequestSetKey extends SchedulerRequestKey {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ResourceRequestSetKey.class);
+
   // More ResourceRequest key fields on top of SchedulerRequestKey
   private final Resource resource;
   private final ExecutionType execType;
@@ -123,6 +131,46 @@ public class ResourceRequestSetKey extends SchedulerRequestKey {
     return this.execType.compareTo(otherKey.execType);
   }
 
+  /**
+   * Extract the corresponding ResourceRequestSetKey for an allocated container
+   * from a given set. Return null if not found.
+   *
+   * @param container the allocated container
+   * @param keys the set of keys to look from
+   * @return ResourceRequestSetKey
+   */
+  public static ResourceRequestSetKey extractMatchingKey(Container container,
+      Set<ResourceRequestSetKey> keys) {
+    ResourceRequestSetKey resourceRequestSetKey = new ResourceRequestSetKey(
+        container.getAllocationRequestId(), container.getPriority(),
+        container.getResource(), container.getExecutionType());
+    if (keys.contains(resourceRequestSetKey)) {
+      return resourceRequestSetKey;
+    }
+
+    if (container.getAllocationRequestId() > 0) {
+      // If no exact match, look for the one with the same (non-zero)
+      // allocationRequestId
+      for (ResourceRequestSetKey candidate : keys) {
+        if (candidate.getAllocationRequestId() == container.getAllocationRequestId()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Using possible match for {} : {}", resourceRequestSetKey, candidate);
+          }
+          return candidate;
+        }
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("not match found for container {}.", container.getId());
+      for (ResourceRequestSetKey candidate : keys) {
+        LOG.debug("candidate set keys: {}.", candidate.toString());
+      }
+    }
+
+    return null;
+  }
+
   @Override
   public String toString() {
     return "[id:" + getAllocationRequestId() + " p:"

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java

@@ -140,7 +140,7 @@ public class UnmanagedApplicationManager {
     this.userUgi = null;
     // Relayer's rmClient will be set after the RM connection is created
     this.rmProxyRelayer =
-        new AMRMClientRelayer(null, this.applicationId, rmName);
+        new AMRMClientRelayer(null, this.applicationId, rmName, this.conf);
     this.heartbeatHandler = createAMHeartbeatRequestHandler(this.conf,
         this.applicationId, this.rmProxyRelayer);
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java

@@ -155,7 +155,7 @@ public class TestAMRMClientRelayer {
     this.conf = new Configuration();
 
     this.mockAMS = new MockApplicationMasterService();
-    this.relayer = new AMRMClientRelayer(this.mockAMS, null, "TEST");
+    this.relayer = new AMRMClientRelayer(this.mockAMS, null, "TEST", conf);
     this.relayer.registerApplicationMaster(
         RegisterApplicationMasterRequest.newInstance("", 0, ""));
 

+ 96 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java

@@ -32,6 +32,7 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -820,4 +821,99 @@ public class TestLocalityMulticastAMRMProxyPolicy
       return getHomeSubCluster();
     }
   }
+
+  /**
+   * Test the rerouting behavior when some subclusters are loaded. Make sure
+   * that the AMRMProxy rerouting decisions attempt to redirect requests
+   * to the least loaded subcluster when load thresholds are exceeded
+   */
+  @Test
+  public void testLoadBasedSubClusterReroute() throws YarnException {
+    int pendingThreshold = 1000;
+
+    LocalityMulticastAMRMProxyPolicy policy = (LocalityMulticastAMRMProxyPolicy) getPolicy();
+    initializePolicy();
+
+    SubClusterId sc0 = SubClusterId.newInstance("0");
+    SubClusterId sc1 = SubClusterId.newInstance("1");
+    SubClusterId sc2 = SubClusterId.newInstance("2");
+    SubClusterId sc3 = SubClusterId.newInstance("3");
+    SubClusterId sc4 = SubClusterId.newInstance("4");
+
+    Set<SubClusterId> scList = new HashSet<>();
+    scList.add(sc0);
+    scList.add(sc1);
+    scList.add(sc2);
+    scList.add(sc3);
+    scList.add(sc4);
+
+    // This cluster is the most overloaded - 4 times the threshold.
+    policy.notifyOfResponse(sc0,
+        getAllocateResponseWithEnhancedHeadroom(4 * pendingThreshold, 0));
+
+    // This cluster is the most overloaded - 4 times the threshold.
+    policy.notifyOfResponse(sc1,
+        getAllocateResponseWithEnhancedHeadroom(4 * pendingThreshold, 0));
+
+    // This cluster is 2 times the threshold, but not the most loaded.
+    policy.notifyOfResponse(sc2,
+        getAllocateResponseWithEnhancedHeadroom(2 * pendingThreshold, 0));
+
+    // This cluster is at the threshold, but not the most loaded.
+    policy.notifyOfResponse(sc3,
+        getAllocateResponseWithEnhancedHeadroom(pendingThreshold, 0));
+
+    // This cluster has zero pending.
+    policy.notifyOfResponse(sc4, getAllocateResponseWithEnhancedHeadroom(0, 0));
+
+    // sc2, sc3 and sc4 should just return the original subcluster.
+    Assert.assertEquals(
+        policy.routeNodeRequestIfNeeded(sc2, pendingThreshold, scList), sc2);
+    Assert.assertEquals(
+        policy.routeNodeRequestIfNeeded(sc3, pendingThreshold, scList), sc3);
+    Assert.assertEquals(
+        policy.routeNodeRequestIfNeeded(sc4, pendingThreshold, scList), sc4);
+
+    // sc0 and sc1 must select from sc0/sc1/sc2/sc3/sc4 according to weights
+    // 1/4, 1/4, 1/2, 1, 2. Let's run tons of random of samples, and verify that
+    // the proportion approximately holds.
+    Map<SubClusterId, Integer> counts = new HashMap<>();
+    counts.put(sc0, 0);
+    counts.put(sc1, 0);
+    counts.put(sc2, 0);
+    counts.put(sc3, 0);
+    counts.put(sc4, 0);
+
+    int n = 100000;
+    for (int i = 0; i < n; i++) {
+      SubClusterId selectedId = policy.routeNodeRequestIfNeeded(sc0, pendingThreshold, scList);
+      counts.put(selectedId, counts.get(selectedId) + 1);
+
+      selectedId = policy.routeNodeRequestIfNeeded(sc1, pendingThreshold, scList);
+      counts.put(selectedId, counts.get(selectedId) + 1);
+
+      // Also try a new SCId that's not active and enabled. Should be rerouted
+      // to sc0-4 with the same distribution as above
+      selectedId = policy.routeNodeRequestIfNeeded(SubClusterId.newInstance("10"),
+          pendingThreshold, scList);
+      counts.put(selectedId, counts.get(selectedId) + 1);
+    }
+
+    // The probability should be 1/16, 1/16, 1/8, 1/4, 1/2R
+    Assert.assertEquals((double) counts.get(sc0) / n / 3, 1 / 16.0, 0.01);
+    Assert.assertEquals((double) counts.get(sc1) / n / 3, 1 / 16.0, 0.01);
+    Assert.assertEquals((double) counts.get(sc2) / n / 3, 1 / 8.0, 0.01);
+    Assert.assertEquals((double) counts.get(sc3) / n / 3, 1 / 4.0, 0.01);
+    Assert.assertEquals((double) counts.get(sc4) / n / 3, 1 / 2.0, 0.01);
+
+    // Everything should be routed to these five active and enabled SCs
+    Assert.assertEquals(5, counts.size());
+  }
+
+  private AllocateResponse getAllocateResponseWithEnhancedHeadroom(int pending, int activeCores) {
+    return AllocateResponse.newInstance(0, null, null,
+        Collections.emptyList(), Resource.newInstance(0, 0), null, 10, null,
+        Collections.emptyList(), null, null, null,
+        EnhancedHeadroom.newInstance(pending, activeCores));
+  }
 }

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/metrics/TestAMRMClientRelayerMetrics.java

@@ -140,12 +140,12 @@ public class TestAMRMClientRelayerMetrics {
     this.mockAMS = new MockApplicationMasterService();
 
     this.homeRelayer = new AMRMClientRelayer(this.mockAMS,
-        ApplicationId.newInstance(0, 0), this.homeID);
+        ApplicationId.newInstance(0, 0), this.homeID, conf);
     this.homeRelayer.registerApplicationMaster(
         RegisterApplicationMasterRequest.newInstance("", 0, ""));
 
     this.uamRelayer = new AMRMClientRelayer(this.mockAMS,
-        ApplicationId.newInstance(0, 0), this.uamID);
+        ApplicationId.newInstance(0, 0), this.uamID, conf);
     this.uamRelayer.registerApplicationMaster(
         RegisterApplicationMasterRequest.newInstance("", 0, ""));
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java

@@ -321,7 +321,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
     this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
         ApplicationMasterProtocol.class, appOwner), appId,
-        this.homeSubClusterId.toString());
+        this.homeSubClusterId.toString(), conf);
 
     this.homeHeartbeatHandler =
         createHomeHeartbeatHandler(conf, appId, this.homeRMRelayer);