Browse Source

YARN-5676. Add a HashBasedRouterPolicy, and small policies and test refactoring. (Carlo Curino via Subru).

Subru Krishnan 8 years ago
parent
commit
575137f41c
29 changed files with 414 additions and 52 deletions
  1. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  2. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
  3. 16 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java
  4. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java
  5. 38 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java
  6. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java
  7. 3 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java
  8. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java
  9. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java
  10. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
  11. 81 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
  12. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
  13. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
  14. 6 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
  15. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
  16. 15 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
  17. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
  18. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java
  19. 8 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java
  20. 40 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestHashBasedBroadcastPolicyManager.java
  21. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestPriorityBroadcastPolicyManager.java
  22. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestUniformBroadcastPolicyManager.java
  23. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedLocalityPolicyManager.java
  24. 51 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
  25. 83 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java
  26. 1 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
  27. 1 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
  28. 1 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
  29. 10 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -2600,7 +2600,8 @@ public class YarnConfiguration extends Configuration {
       + "policy-manager";
 
   public static final String DEFAULT_FEDERATION_POLICY_MANAGER = "org.apache"
-      + ".hadoop.yarn.server.federation.policies.UniformBroadcastPolicyManager";
+      + ".hadoop.yarn.server.federation.policies"
+      + ".manager.UniformBroadcastPolicyManager";
 
   public static final String FEDERATION_POLICY_MANAGER_PARAMS =
       FEDERATION_PREFIX + "policy-manager-params";

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
 import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;

+ 16 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java

@@ -15,8 +15,11 @@
  * the License.
  */
 
-package org.apache.hadoop.yarn.server.federation.policies;
+package org.apache.hadoop.yarn.server.federation.policies.manager;
 
+import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
@@ -24,6 +27,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyCo
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+
 /**
  * This class provides basic implementation for common methods that multiple
  * policies will need to implement.
@@ -112,6 +117,16 @@ public abstract class AbstractPolicyManager implements
     }
   }
 
+  @Override
+  public SubClusterPolicyConfiguration serializeConf()
+      throws FederationPolicyInitializationException {
+    // default implementation works only for sub-classes which do not require
+    // any parameters
+    ByteBuffer buf = ByteBuffer.allocate(0);
+    return SubClusterPolicyConfiguration
+        .newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
+  }
+
   @Override
   public String getQueue() {
     return queue;

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java

@@ -15,8 +15,9 @@
  * the License.
  */
 
-package org.apache.hadoop.yarn.server.federation.policies;
+package org.apache.hadoop.yarn.server.federation.policies.manager;
 
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;

+ 38 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.manager;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.router.HashBasedRouterPolicy;
+
+/**
+ * Policy that routes applications via hashing of their queuename, and broadcast
+ * resource requests. This picks a {@link HashBasedRouterPolicy} for the router
+ * and a {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed
+ * to work together.
+ */
+public class HashBroadcastPolicyManager extends AbstractPolicyManager {
+
+  public HashBroadcastPolicyManager() {
+    // this structurally hard-codes two compatible policies for Router and
+    // AMRMProxy.
+    routerFederationPolicy = HashBasedRouterPolicy.class;
+    amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
+  }
+
+}

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java

@@ -15,7 +15,7 @@
  * the License.
  */
 
-package org.apache.hadoop.yarn.server.federation.policies;
+package org.apache.hadoop.yarn.server.federation.policies.manager;
 
 import java.nio.ByteBuffer;
 

+ 3 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java

@@ -15,14 +15,10 @@
  * the License.
  */
 
-package org.apache.hadoop.yarn.server.federation.policies;
+package org.apache.hadoop.yarn.server.federation.policies.manager;
 
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-
-import java.nio.ByteBuffer;
 
 /**
  * This class represents a simple implementation of a {@code
@@ -36,21 +32,13 @@ import java.nio.ByteBuffer;
  * containers than a job requested as all requests are (replicated and)
  * broadcasted.
  */
-public class UniformBroadcastPolicyManager
-    extends AbstractPolicyManager {
+public class UniformBroadcastPolicyManager extends AbstractPolicyManager {
 
   public UniformBroadcastPolicyManager() {
-    //this structurally hard-codes two compatible policies for Router and
+    // this structurally hard-codes two compatible policies for Router and
     // AMRMProxy.
     routerFederationPolicy = UniformRandomRouterPolicy.class;
     amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
   }
 
-  @Override
-  public SubClusterPolicyConfiguration serializeConf()
-      throws FederationPolicyInitializationException {
-    ByteBuffer buf = ByteBuffer.allocate(0);
-    return SubClusterPolicyConfiguration
-        .newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
-  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java

@@ -15,7 +15,7 @@
  * the License.
  */
 
-package org.apache.hadoop.yarn.server.federation.policies;
+package org.apache.hadoop.yarn.server.federation.policies.manager;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java

@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Various implementation of FederationPolicyManager. **/
+package org.apache.hadoop.yarn.server.federation.policies.manager;

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java

@@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.federation.policies.router;
 
 import java.util.Map;
 
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 
@@ -44,4 +47,20 @@ public abstract class AbstractRouterPolicy extends
     }
   }
 
+  public void validate(ApplicationSubmissionContext appSubmissionContext)
+      throws FederationPolicyException {
+
+    if (appSubmissionContext == null) {
+      throw new FederationPolicyException(
+          "Cannot route an application with null context.");
+    }
+
+    // if the queue is not specified we set it to default value, to be
+    // compatible with YARN behavior.
+    String queue = appSubmissionContext.getQueue();
+    if (queue == null) {
+      appSubmissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+    }
+  }
+
 }

+ 81 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+/**
+ * This {@link FederationRouterPolicy} pick a subcluster based on the hash of
+ * the job's queue name. Useful to provide a default behavior when too many
+ * queues exist in a system. This also ensures that all jobs belonging to a
+ * queue are mapped to the same sub-cluster (likely help with locality).
+ */
+public class HashBasedRouterPolicy extends AbstractRouterPolicy {
+
+  @Override
+  public void reinitialize(
+      FederationPolicyInitializationContext federationPolicyContext)
+      throws FederationPolicyInitializationException {
+    FederationPolicyInitializationContextValidator
+        .validate(federationPolicyContext, this.getClass().getCanonicalName());
+
+    // note: this overrides BaseRouterPolicy and ignores the weights
+    setPolicyContext(federationPolicyContext);
+  }
+
+  /**
+   * Simply picks from alphabetically-sorted active subclusters based on the
+   * hash of quey name. Jobs of the same queue will all be routed to the same
+   * sub-cluster, as far as the number of active sub-cluster and their names
+   * remain the same.
+   *
+   * @param appSubmissionContext the context for the app being submitted.
+   *
+   * @return a hash-based chosen subcluster.
+   *
+   * @throws YarnException if there are no active subclusters.
+   */
+  public SubClusterId getHomeSubcluster(
+      ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+
+    // throws if no active subclusters available
+    Map<SubClusterId, SubClusterInfo> activeSubclusters =
+        getActiveSubclusters();
+
+    validate(appSubmissionContext);
+
+    int chosenPosition = Math.abs(
+        appSubmissionContext.getQueue().hashCode() % activeSubclusters.size());
+
+    List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
+    Collections.sort(list);
+    return list.get(chosenPosition);
+  }
+
+}

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java

@@ -64,6 +64,9 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
   public SubClusterId getHomeSubcluster(
       ApplicationSubmissionContext appSubmissionContext) throws YarnException {
 
+    // null checks and default-queue behavior
+    validate(appSubmissionContext);
+
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java

@@ -36,6 +36,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
   public SubClusterId getHomeSubcluster(
       ApplicationSubmissionContext appSubmissionContext) throws YarnException {
 
+    // null checks and default-queue behavior
+    validate(appSubmissionContext);
+
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 

+ 6 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java

@@ -48,11 +48,10 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
   }
 
   @Override
-  public void reinitialize(
-      FederationPolicyInitializationContext policyContext)
+  public void reinitialize(FederationPolicyInitializationContext policyContext)
       throws FederationPolicyInitializationException {
-    FederationPolicyInitializationContextValidator
-        .validate(policyContext, this.getClass().getCanonicalName());
+    FederationPolicyInitializationContextValidator.validate(policyContext,
+        this.getClass().getCanonicalName());
 
     // note: this overrides AbstractRouterPolicy and ignores the weights
 
@@ -73,6 +72,9 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
   public SubClusterId getHomeSubcluster(
       ApplicationSubmissionContext appSubmissionContext) throws YarnException {
 
+    // null checks and default-queue behavior
+    validate(appSubmissionContext);
+
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java

@@ -43,6 +43,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
   public SubClusterId getHomeSubcluster(
       ApplicationSubmissionContext appSubmissionContext) throws YarnException {
 
+    // null checks and default-queue behavior
+    validate(appSubmissionContext);
+
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 

+ 15 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.federation.policies;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
@@ -35,8 +36,10 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPo
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
 import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
 import org.junit.Test;
 
@@ -46,7 +49,7 @@ import org.junit.Test;
 public abstract class BaseFederationPoliciesTest {
 
   private ConfigurableFederationPolicy policy;
-  private WeightedPolicyInfo policyInfo;
+  private WeightedPolicyInfo policyInfo = mock(WeightedPolicyInfo.class);
   private Map<SubClusterId, SubClusterInfo> activeSubclusters = new HashMap<>();
   private FederationPolicyInitializationContext federationPolicyContext;
   private ApplicationSubmissionContext applicationSubmissionContext =
@@ -103,7 +106,7 @@ public abstract class BaseFederationPoliciesTest {
       ((FederationRouterPolicy) localPolicy)
           .getHomeSubcluster(getApplicationSubmissionContext());
     } else {
-      String[] hosts = new String[] {"host1", "host2" };
+      String[] hosts = new String[] {"host1", "host2"};
       List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
           .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
       ((FederationAMRMProxyPolicy) localPolicy)
@@ -170,4 +173,14 @@ public abstract class BaseFederationPoliciesTest {
     this.homeSubCluster = homeSubCluster;
   }
 
+  public void setMockActiveSubclusters(int numSubclusters) {
+    for (int i = 1; i <= numSubclusters; i++) {
+      SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
+      SubClusterInfo sci = mock(SubClusterInfo.class);
+      when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
+      when(sci.getSubClusterId()).thenReturn(sc.toId());
+      getActiveSubclusters().put(sc.toId(), sci);
+    }
+  }
+
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java

@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
 import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java

@@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.PriorityBroadcastPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
 import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;

+ 8 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java

@@ -15,8 +15,9 @@
  * the License.
  */
 
-package org.apache.hadoop.yarn.server.federation.policies;
+package org.apache.hadoop.yarn.server.federation.policies.manager;
 
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
@@ -32,7 +33,6 @@ import org.junit.Test;
  */
 public abstract class BasePolicyManagerTest {
 
-
   @SuppressWarnings("checkstyle:visibilitymodifier")
   protected FederationPolicyManager wfp = null;
   @SuppressWarnings("checkstyle:visibilitymodifier")
@@ -42,12 +42,10 @@ public abstract class BasePolicyManagerTest {
   @SuppressWarnings("checkstyle:visibilitymodifier")
   protected Class expectedRouterPolicy;
 
-
   @Test
   public void testSerializeAndInstantiate() throws Exception {
     serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
-        expectedAMRMProxyPolicy,
-        expectedRouterPolicy);
+        expectedAMRMProxyPolicy, expectedRouterPolicy);
   }
 
   @Test(expected = FederationPolicyInitializationException.class)
@@ -73,11 +71,10 @@ public abstract class BasePolicyManagerTest {
       Class expAMRMProxyPolicy, Class expRouterPolicy) throws Exception {
 
     // serializeConf it in a context
-    SubClusterPolicyConfiguration fpc =
-        wfp.serializeConf();
+    SubClusterPolicyConfiguration fpc = wfp.serializeConf();
     fpc.setType(policyManagerType.getCanonicalName());
-    FederationPolicyInitializationContext context = new
-        FederationPolicyInitializationContext();
+    FederationPolicyInitializationContext context =
+        new FederationPolicyInitializationContext();
     context.setSubClusterPolicyConfiguration(fpc);
     context
         .setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
@@ -92,7 +89,7 @@ public abstract class BasePolicyManagerTest {
     FederationAMRMProxyPolicy federationAMRMProxyPolicy =
         wfp2.getAMRMPolicy(context, null);
 
-    //needed only for tests (getARMRMPolicy change the "type" in conf)
+    // needed only for tests (getARMRMPolicy change the "type" in conf)
     fpc.setType(wfp.getClass().getCanonicalName());
 
     FederationRouterPolicy federationRouterPolicy =
@@ -101,8 +98,7 @@ public abstract class BasePolicyManagerTest {
     Assert.assertEquals(federationAMRMProxyPolicy.getClass(),
         expAMRMProxyPolicy);
 
-    Assert.assertEquals(federationRouterPolicy.getClass(),
-        expRouterPolicy);
+    Assert.assertEquals(federationRouterPolicy.getClass(), expRouterPolicy);
   }
 
 }

+ 40 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestHashBasedBroadcastPolicyManager.java

@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.manager;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.router.HashBasedRouterPolicy;
+import org.junit.Before;
+
+/**
+ * Simple test of {@link HashBroadcastPolicyManager}.
+ */
+public class TestHashBasedBroadcastPolicyManager extends BasePolicyManagerTest {
+
+  @Before
+  public void setup() {
+    // config policy
+    wfp = new HashBroadcastPolicyManager();
+    wfp.setQueue("queue1");
+
+    // set expected params that the base test class will use for tests
+    expectedPolicyManager = HashBroadcastPolicyManager.class;
+    expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class;
+    expectedRouterPolicy = HashBasedRouterPolicy.class;
+  }
+}

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestPriorityBroadcastPolicyManager.java

@@ -15,7 +15,7 @@
  * the License.
  */
 
-package org.apache.hadoop.yarn.server.federation.policies;
+package org.apache.hadoop.yarn.server.federation.policies.manager;
 
 import java.util.HashMap;
 import java.util.Map;

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestUniformBroadcastPolicyManager.java

@@ -15,7 +15,7 @@
  * the License.
  */
 
-package org.apache.hadoop.yarn.server.federation.policies;
+package org.apache.hadoop.yarn.server.federation.policies.manager;
 
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedLocalityPolicyManager.java

@@ -15,7 +15,7 @@
  * the License.
  */
 
-package org.apache.hadoop.yarn.server.federation.policies;
+package org.apache.hadoop.yarn.server.federation.policies.manager;
 
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;

+ 51 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java

@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Base class for router policies tests, tests for null input cases.
+ */
+public abstract class BaseRouterPoliciesTest
+    extends BaseFederationPoliciesTest {
+
+  @Test
+  public void testNullQueueRouting() throws YarnException {
+    FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy();
+    ApplicationSubmissionContext applicationSubmissionContext =
+        ApplicationSubmissionContext.newInstance(null, null, null, null, null,
+            false, false, 0, Resources.none(), null, false, null, null);
+    SubClusterId chosen =
+        localPolicy.getHomeSubcluster(applicationSubmissionContext);
+    Assert.assertNotNull(chosen);
+  }
+
+  @Test(expected = FederationPolicyException.class)
+  public void testNullAppContext() throws YarnException {
+    ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(null);
+  }
+}

+ 83 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java

@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Simple test class for the {@link HashBasedRouterPolicy}. Tests that one of
+ * the active sub-cluster is chosen.
+ */
+public class TestHashBasedRouterPolicy extends BaseRouterPoliciesTest {
+
+  private int numSubclusters = 10;
+
+  @Before
+  public void setUp() throws Exception {
+
+    // set policy in base class
+    setPolicy(new HashBasedRouterPolicy());
+
+    // setting up the active sub-clusters for this test
+    setMockActiveSubclusters(numSubclusters);
+
+    // initialize policy with context
+    FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
+        getPolicyInfo(), getActiveSubclusters());
+  }
+
+  @Test
+  public void testHashSpreadUniformlyAmongSubclusters() throws YarnException {
+    SubClusterId chosen;
+
+    Map<SubClusterId, AtomicLong> counter = new HashMap<>();
+    for (SubClusterId id : getActiveSubclusters().keySet()) {
+      counter.put(id, new AtomicLong(0));
+    }
+
+    long jobPerSub = 100;
+
+    ApplicationSubmissionContext applicationSubmissionContext =
+        mock(ApplicationSubmissionContext.class);
+    for (int i = 0; i < jobPerSub * numSubclusters; i++) {
+      when(applicationSubmissionContext.getQueue()).thenReturn("queue" + i);
+      chosen = ((FederationRouterPolicy) getPolicy())
+          .getHomeSubcluster(applicationSubmissionContext);
+      counter.get(chosen).addAndGet(1);
+    }
+
+    // hash spread the jobs equally among the subclusters
+    for (AtomicLong a : counter.values()) {
+      Assert.assertEquals(a.get(), jobPerSub);
+    }
+
+  }
+}

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java

@@ -21,7 +21,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@@ -36,7 +35,7 @@ import org.junit.Test;
  * Simple test class for the {@link LoadBasedRouterPolicy}. Test that the load
  * is properly considered for allocation.
  */
-public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest {
+public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest {
 
   @Before
   public void setUp() throws Exception {

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java

@@ -23,7 +23,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@@ -38,7 +37,7 @@ import org.junit.Test;
  * Simple test class for the {@link PriorityRouterPolicy}. Tests that the
  * weights are correctly used for ordering the choice of sub-clusters.
  */
-public class TestPriorityRouterPolicy extends BaseFederationPoliciesTest {
+public class TestPriorityRouterPolicy extends BaseRouterPoliciesTest {
 
   @Before
   public void setUp() throws Exception {

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java

@@ -21,7 +21,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@@ -36,7 +35,7 @@ import org.junit.Test;
  * Simple test class for the {@link UniformRandomRouterPolicy}. Tests that one
  * of the active subcluster is chosen.
  */
-public class TestUniformRandomRouterPolicy extends BaseFederationPoliciesTest {
+public class TestUniformRandomRouterPolicy extends BaseRouterPoliciesTest {
 
   @Before
   public void setUp() throws Exception {

+ 10 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java

@@ -24,8 +24,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@@ -41,7 +41,7 @@ import org.junit.Test;
  * number of randomized tests to check we are weighiting correctly even if
  * clusters go inactive.
  */
-public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
+public class TestWeightedRandomRouterPolicy extends BaseRouterPoliciesTest {
 
   @Before
   public void setUp() throws Exception {
@@ -78,13 +78,18 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
   @Test
   public void testClusterChosenWithRightProbability() throws YarnException {
 
+    ApplicationSubmissionContext context =
+        mock(ApplicationSubmissionContext.class);
+    when(context.getQueue()).thenReturn("queue1");
+    setApplicationSubmissionContext(context);
+
     Map<SubClusterId, AtomicLong> counter = new HashMap<>();
     for (SubClusterIdInfo id : getPolicyInfo().getRouterPolicyWeights()
         .keySet()) {
       counter.put(id.toId(), new AtomicLong(0));
     }
 
-    float numberOfDraws = 1000000;
+    float numberOfDraws = 100000;
 
     for (float i = 0; i < numberOfDraws; i++) {
       SubClusterId chosenId = ((FederationRouterPolicy) getPolicy())
@@ -113,8 +118,8 @@ public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
         Assert.assertTrue(
             "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight
                 + " expected weight: " + expectedWeight,
-            expectedWeight == 0 || (actualWeight / expectedWeight) < 1.1
-                && (actualWeight / expectedWeight) > 0.9);
+            expectedWeight == 0 || (actualWeight / expectedWeight) < 1.2
+                && (actualWeight / expectedWeight) > 0.8);
       } else {
         Assert
             .assertTrue(