Browse Source

YARN-8481. AMRMProxyPolicies should accept heartbeat response from new/unknown subclusters. Contributed by Botong Huang.

Giovanni Matteo Fumarola 6 năm trước cách đây
mục cha
commit
cdb084426b

+ 0 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java

@@ -19,10 +19,8 @@
 package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -30,7 +28,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.UnknownSubclusterException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 
@@ -40,8 +37,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
  */
 public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
 
-  private Set<SubClusterId> knownClusterIds = new HashSet<>();
-
   @Override
   public void reinitialize(
       FederationPolicyInitializationContext policyContext)
@@ -65,7 +60,6 @@ public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
     // simply broadcast the resource request to all sub-clusters
     for (SubClusterId subClusterId : activeSubclusters.keySet()) {
       answer.put(subClusterId, resourceRequests);
-      knownClusterIds.add(subClusterId);
     }
 
     return answer;
@@ -74,11 +68,6 @@ public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
   @Override
   public void notifyOfResponse(SubClusterId subClusterId,
       AllocateResponse response) throws YarnException {
-    if (!knownClusterIds.contains(subClusterId)) {
-      throw new UnknownSubclusterException(
-          "The response is received from a subcluster that is unknown to this "
-              + "policy.");
-    }
     // stateless policy does not care about responses
   }
 

+ 0 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/RejectAMRMProxyPolicy.java

@@ -18,10 +18,8 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
 
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -38,8 +36,6 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
  */
 public class RejectAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
 
-  private Set<SubClusterId> knownClusterIds = new HashSet<>();
-
   @Override
   public void reinitialize(FederationPolicyInitializationContext policyContext)
       throws FederationPolicyInitializationException {

+ 3 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestBroadcastAMRMProxyFederationPolicy.java

@@ -89,7 +89,7 @@ public class TestBroadcastAMRMProxyFederationPolicy
   }
 
   @Test
-  public void testNotifyOfResponse() throws Exception {
+  public void testNotifyOfResponseFromUnknownSubCluster() throws Exception {
     String[] hosts = new String[] {"host1", "host2" };
     List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
         .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
@@ -97,13 +97,8 @@ public class TestBroadcastAMRMProxyFederationPolicy
         ((FederationAMRMProxyPolicy) getPolicy())
             .splitResourceRequests(resourceRequests);
 
-    try {
-      ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
-          SubClusterId.newInstance("sc3"), mock(AllocateResponse.class));
-      Assert.fail();
-    } catch (FederationPolicyException f) {
-      System.out.println("Expected: " + f.getMessage());
-    }
+    ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
+        SubClusterId.newInstance("sc3"), mock(AllocateResponse.class));
 
     ((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
         SubClusterId.newInstance("sc1"), mock(AllocateResponse.class));