浏览代码

YARN-8581. [AMRMProxy] Add sub-cluster timeout in LocalityMulticastAMRMProxyPolicy. Contributed by Botong Huang.

Giovanni Matteo Fumarola 6 年之前
父节点
当前提交
e0f6ffdbad

+ 7 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -3209,8 +3209,14 @@ public class YarnConfiguration extends Configuration {
       "org.apache.hadoop.yarn.server.federation.resolver."
           + "DefaultSubClusterResolverImpl";
 
-  public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
+  // AMRMProxy split-merge timeout for active sub-clusters. We will not route
+  // new asks to expired sub-clusters.
+  public static final String FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT =
+      FEDERATION_PREFIX + "amrmproxy.subcluster.timeout.ms";
+  public static final long DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT =
+      60000; // one minute
 
+  public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
   public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX
       + "policy-manager";
 

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java

@@ -105,6 +105,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
         .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER);
     configurationPropsToSkipCompare
         .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
 
     // Federation StateStore ZK implementation configs to be ignored
     configurationPropsToSkipCompare.add(

+ 59 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java

@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@@ -126,6 +127,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
   private SubClusterResolver resolver;
 
   private Map<SubClusterId, Resource> headroom;
+  private Map<SubClusterId, Long> lastHeartbeatTimeStamp;
+  private long subClusterTimeOut;
   private float hrAlpha;
   private FederationStateStoreFacade federationFacade;
   private AllocationBookkeeper bookkeeper;
@@ -178,6 +181,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
 
     if (headroom == null) {
       headroom = new ConcurrentHashMap<>();
+      lastHeartbeatTimeStamp = new ConcurrentHashMap<>();
     }
     hrAlpha = policy.getHeadroomAlpha();
 
@@ -185,13 +189,29 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
         policyContext.getFederationStateStoreFacade();
     this.homeSubcluster = policyContext.getHomeSubcluster();
 
+    this.subClusterTimeOut = this.federationFacade.getConf().getLong(
+        YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
+        YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
+    if (this.subClusterTimeOut <= 0) {
+      LOG.info(
+          "{} configured to be {}, should be positive. Using default of {}.",
+          YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
+          this.subClusterTimeOut,
+          YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
+      this.subClusterTimeOut =
+          YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
+    }
   }
 
   @Override
   public void notifyOfResponse(SubClusterId subClusterId,
       AllocateResponse response) throws YarnException {
-    // stateless policy does not care about responses except tracking headroom
-    headroom.put(subClusterId, response.getAvailableResources());
+    if (response.getAvailableResources() != null) {
+      headroom.put(subClusterId, response.getAvailableResources());
+      LOG.info("Subcluster {} updated with {} memory headroom", subClusterId,
+          response.getAvailableResources().getMemorySize());
+    }
+    lastHeartbeatTimeStamp.put(subClusterId, System.currentTimeMillis());
   }
 
   @Override
@@ -281,6 +301,15 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
     // handle all non-localized requests (ANY)
     splitAnyRequests(nonLocalizedRequests, bookkeeper);
 
+    for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : bookkeeper
+        .getAnswer().entrySet()) {
+      // A new-cluster here will trigger new UAM luanch, which might take a long
+      // time. We don't want too many requests stuck in this UAM before it is
+      // ready and starts heartbeating
+      if (!lastHeartbeatTimeStamp.containsKey(entry.getKey())) {
+        lastHeartbeatTimeStamp.put(entry.getKey(), System.currentTimeMillis());
+      }
+    }
     return bookkeeper.getAnswer();
   }
 
@@ -519,13 +548,10 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
       policyWeights = weights;
       totPolicyWeight = 0;
 
-      // pre-compute the set of subclusters that are both active and enabled by
-      // the policy weights, and accumulate their total weight
       for (Map.Entry<SubClusterId, Float> entry : policyWeights.entrySet()) {
         if (entry.getValue() > 0
             && activeSubclusters.containsKey(entry.getKey())) {
           activeAndEnabledSC.add(entry.getKey());
-          totPolicyWeight += entry.getValue();
         }
       }
 
@@ -535,6 +561,34 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
                 + "currently active we cannot forward the ResourceRequest(s)");
       }
 
+      Set<SubClusterId> tmpSCSet = new HashSet<>(activeAndEnabledSC);
+      for (Map.Entry<SubClusterId, Long> entry : lastHeartbeatTimeStamp
+          .entrySet()) {
+        long duration = System.currentTimeMillis() - entry.getValue();
+        if (duration > subClusterTimeOut) {
+          LOG.warn(
+              "Subcluster {} does not have a success heartbeat for {}s, "
+                  + "skip routing asks there for this request",
+              entry.getKey(), (double) duration / 1000);
+          tmpSCSet.remove(entry.getKey());
+        }
+      }
+      if (tmpSCSet.size() < 1) {
+        LOG.warn("All active and enabled subclusters have expired last "
+            + "heartbeat time. Ignore the expiry check for this request");
+      } else {
+        activeAndEnabledSC = tmpSCSet;
+      }
+
+      LOG.info("{} subcluster active, {} subclusters active and enabled",
+          activeSubclusters.size(), activeAndEnabledSC.size());
+
+      // pre-compute the set of subclusters that are both active and enabled by
+      // the policy weights, and accumulate their total weight
+      for (SubClusterId sc : activeAndEnabledSC) {
+        totPolicyWeight += policyWeights.get(sc);
+      }
+
       // pre-compute headroom-based weights for active/enabled subclusters
       for (Map.Entry<SubClusterId, Resource> r : headroom.entrySet()) {
         if (activeAndEnabledSC.contains(r.getKey())) {

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java

@@ -391,6 +391,15 @@ public final class FederationStateStoreFacade {
     return this.subclusterResolver;
   }
 
+  /**
+   * Get the configuration.
+   *
+   * @return configuration object
+   */
+  public Configuration getConf() {
+    return this.conf;
+  }
+
   /**
    * Helper method to create instances of Object using the class name defined in
    * the configuration object. The instances creates {@link RetryProxy} using

+ 81 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java

@@ -32,11 +32,13 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@@ -106,6 +108,10 @@ public class TestLocalityMulticastAMRMProxyPolicy
   }
 
   private void initializePolicy() throws YarnException {
+    initializePolicy(new YarnConfiguration());
+  }
+
+  private void initializePolicy(Configuration conf) throws YarnException {
     setFederationPolicyContext(new FederationPolicyInitializationContext());
     SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver();
     getFederationPolicyContext().setFederationSubclusterResolver(resolver);
@@ -116,7 +122,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
     getFederationPolicyContext().setHomeSubcluster(getHomeSubCluster());
     FederationPoliciesTestUtil.initializePolicyContext(
         getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
-        getActiveSubclusters());
+        getActiveSubclusters(), conf);
   }
 
   @Test(expected = FederationPolicyInitializationException.class)
@@ -145,7 +151,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
     initializePolicy();
     List<ResourceRequest> resourceRequests = createSimpleRequest();
 
-    prepPolicyWithHeadroom();
+    prepPolicyWithHeadroom(true);
 
     Map<SubClusterId, List<ResourceRequest>> response =
         ((FederationAMRMProxyPolicy) getPolicy())
@@ -205,7 +211,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
     int numRR = 1000;
     List<ResourceRequest> resourceRequests = createLargeRandomList(numRR);
 
-    prepPolicyWithHeadroom();
+    prepPolicyWithHeadroom(true);
 
     int numIterations = 1000;
     long tstart = System.currentTimeMillis();
@@ -233,7 +239,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
     List<ResourceRequest> resourceRequests = createZeroSizedANYRequest();
 
     // this receives responses from sc0,sc1,sc2
-    prepPolicyWithHeadroom();
+    prepPolicyWithHeadroom(true);
 
     Map<SubClusterId, List<ResourceRequest>> response =
         ((FederationAMRMProxyPolicy) getPolicy())
@@ -269,7 +275,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
     initializePolicy();
     List<ResourceRequest> resourceRequests = createSimpleRequest();
 
-    prepPolicyWithHeadroom();
+    prepPolicyWithHeadroom(true);
 
     Map<SubClusterId, List<ResourceRequest>> response =
         ((FederationAMRMProxyPolicy) getPolicy())
@@ -292,10 +298,14 @@ public class TestLocalityMulticastAMRMProxyPolicy
     checkTotalContainerAllocation(response, 100);
   }
 
-  private void prepPolicyWithHeadroom() throws YarnException {
+  private void prepPolicyWithHeadroom(boolean setSubCluster0)
+      throws YarnException {
     AllocateResponse ar = getAllocateResponseWithTargetHeadroom(40);
-    ((FederationAMRMProxyPolicy) getPolicy())
-        .notifyOfResponse(SubClusterId.newInstance("subcluster0"), ar);
+
+    if (setSubCluster0) {
+      ((FederationAMRMProxyPolicy) getPolicy())
+          .notifyOfResponse(SubClusterId.newInstance("subcluster0"), ar);
+    }
 
     ar = getAllocateResponseWithTargetHeadroom(0);
     ((FederationAMRMProxyPolicy) getPolicy())
@@ -333,7 +343,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
 
     FederationPoliciesTestUtil.initializePolicyContext(
         getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
-        getActiveSubclusters());
+        getActiveSubclusters(), new Configuration());
 
     List<ResourceRequest> resourceRequests = createComplexRequest();
 
@@ -669,7 +679,7 @@ public class TestLocalityMulticastAMRMProxyPolicy
     List<ResourceRequest> resourceRequests = new ArrayList<>();
 
     // Initialize the headroom map
-    prepPolicyWithHeadroom();
+    prepPolicyWithHeadroom(true);
 
     // Cancel at ANY level only
     resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L,
@@ -716,4 +726,65 @@ public class TestLocalityMulticastAMRMProxyPolicy
     checkExpectedAllocation(response, "subcluster5", 1, 25);
     checkTotalContainerAllocation(response, 100);
   }
+
+  @Test
+  public void testSubClusterExpiry() throws Exception {
+
+    // Tests how the headroom info are used to split based on the capacity
+    // each RM claims to give us.
+    // Configure policy to be 100% headroom based
+    getPolicyInfo().setHeadroomAlpha(1.0f);
+
+    YarnConfiguration conf = new YarnConfiguration();
+    // Set expiry to 500ms
+    conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
+        500);
+
+    initializePolicy(conf);
+    List<ResourceRequest> resourceRequests = createSimpleRequest();
+
+    // Update the response timestamp for the first time
+    prepPolicyWithHeadroom(true);
+
+    Map<SubClusterId, List<ResourceRequest>> response =
+        ((FederationAMRMProxyPolicy) getPolicy())
+            .splitResourceRequests(resourceRequests);
+
+    // pretty print requests
+    prettyPrintRequests(response);
+
+    validateSplit(response, resourceRequests);
+
+    /*
+     * based on headroom, we expect 75 containers to got to subcluster0 (60) and
+     * subcluster2 (15) according to the advertised headroom (40 and 10), no
+     * containers for sublcuster1 as it advertise zero headroom, and 25 to
+     * subcluster5 which has unknown headroom, and so it gets 1/4th of the load
+     */
+    checkExpectedAllocation(response, "subcluster0", 1, 60);
+    checkExpectedAllocation(response, "subcluster1", 1, -1);
+    checkExpectedAllocation(response, "subcluster2", 1, 15);
+    checkExpectedAllocation(response, "subcluster5", 1, 25);
+    checkTotalContainerAllocation(response, 100);
+
+    Thread.sleep(800);
+
+    // Update the response timestamp for the second time, skipping sc0 and sc5
+    prepPolicyWithHeadroom(false);
+
+    response = ((FederationAMRMProxyPolicy) getPolicy())
+        .splitResourceRequests(resourceRequests);
+
+    // pretty print requests
+    prettyPrintRequests(response);
+
+    validateSplit(response, resourceRequests);
+
+    checkExpectedAllocation(response, "subcluster0", 1, -1);
+    checkExpectedAllocation(response, "subcluster1", 1, -1);
+    checkExpectedAllocation(response, "subcluster2", 1, 100);
+    checkExpectedAllocation(response, "subcluster5", 1, -1);
+    checkTotalContainerAllocation(response, 100);
+  }
+
 }

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java

@@ -117,7 +117,7 @@ public final class FederationPoliciesTestUtil {
   public static void initializePolicyContext(
       FederationPolicyInitializationContext fpc, ConfigurableFederationPolicy
       policy, WeightedPolicyInfo policyInfo,
-      Map<SubClusterId, SubClusterInfo> activeSubclusters)
+      Map<SubClusterId, SubClusterInfo> activeSubclusters, Configuration conf)
       throws YarnException {
     ByteBuffer buf = policyInfo.toByteBuffer();
     fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration
@@ -133,7 +133,7 @@ public final class FederationPoliciesTestUtil {
         .newInstance(new ArrayList<SubClusterInfo>(activeSubclusters.values()));
 
     when(fss.getSubClusters(any())).thenReturn(response);
-    facade.reinitialize(fss, new Configuration());
+    facade.reinitialize(fss, conf);
     fpc.setFederationStateStoreFacade(facade);
     policy.reinitialize(fpc);
   }
@@ -155,7 +155,8 @@ public final class FederationPoliciesTestUtil {
     FederationPolicyInitializationContext context =
         new FederationPolicyInitializationContext(null, initResolver(),
             initFacade(), SubClusterId.newInstance(subclusterId));
-    initializePolicyContext(context, policy, policyInfo, activeSubclusters);
+    initializePolicyContext(context, policy, policyInfo, activeSubclusters,
+        new Configuration());
   }
 
   /**