Selaa lähdekoodia

YARN-8897. LoadBasedRouterPolicy throws NPE in case of sub cluster unavailability. Contributed by Bilwa S T.

Giovanni Matteo Fumarola 6 vuotta sitten
vanhempi
commit
aed836efbf

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@@ -95,7 +96,10 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
         }
       }
     }
-
+    if (chosen == null) {
+      throw new FederationPolicyException(
+          "Zero Active Subcluster with weight 1.");
+    }
     return chosen.toId();
   }
 

+ 31 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java

@@ -17,6 +17,8 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import static org.junit.Assert.fail;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -103,4 +105,33 @@ public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest {
     Assert.assertEquals("sc05", chosen.getId());
   }
 
+  @Test
+  public void testIfNoSubclustersWithWeightOne() {
+    setPolicy(new LoadBasedRouterPolicy());
+    setPolicyInfo(new WeightedPolicyInfo());
+    Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
+    Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
+    // update subcluster with weight 0
+    SubClusterIdInfo sc = new SubClusterIdInfo(String.format("sc%02d", 0));
+    SubClusterInfo federationSubClusterInfo = SubClusterInfo.newInstance(
+        sc.toId(), null, null, null, null, -1, SubClusterState.SC_RUNNING, -1,
+        generateClusterMetricsInfo(0));
+    getActiveSubclusters().clear();
+    getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
+    routerWeights.put(sc, 0.0f);
+    amrmWeights.put(sc, 0.0f);
+    getPolicyInfo().setRouterPolicyWeights(routerWeights);
+    getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
+
+    try {
+      FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
+          getPolicyInfo(), getActiveSubclusters());
+      ((FederationRouterPolicy) getPolicy())
+          .getHomeSubcluster(getApplicationSubmissionContext(), null);
+      fail();
+    } catch (YarnException ex) {
+      Assert.assertTrue(
+          ex.getMessage().contains("Zero Active Subcluster with weight 1"));
+    }
+  }
 }