|
@@ -0,0 +1,1027 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.Set;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeLabel;
|
|
|
+import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+import com.google.common.collect.ImmutableMap;
|
|
|
+import com.google.common.collect.ImmutableSet;
|
|
|
+import com.google.common.collect.Sets;
|
|
|
+
|
|
|
+public class TestNodeLabelContainerAllocation {
|
|
|
+ private final int GB = 1024;
|
|
|
+
|
|
|
+ private YarnConfiguration conf;
|
|
|
+
|
|
|
+ RMNodeLabelsManager mgr;
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void setUp() throws Exception {
|
|
|
+ conf = new YarnConfiguration();
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
+ ResourceScheduler.class);
|
|
|
+ mgr = new NullRMNodeLabelsManager();
|
|
|
+ mgr.init(conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Configuration getConfigurationWithQueueLabels(Configuration config) {
|
|
|
+ CapacitySchedulerConfiguration conf =
|
|
|
+ new CapacitySchedulerConfiguration(config);
|
|
|
+
|
|
|
+ // Define top-level queues
|
|
|
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
|
|
|
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
|
|
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
|
|
|
+
|
|
|
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
|
|
+ conf.setCapacity(A, 10);
|
|
|
+ conf.setMaximumCapacity(A, 15);
|
|
|
+ conf.setAccessibleNodeLabels(A, toSet("x"));
|
|
|
+ conf.setCapacityByLabel(A, "x", 100);
|
|
|
+
|
|
|
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
|
|
+ conf.setCapacity(B, 20);
|
|
|
+ conf.setAccessibleNodeLabels(B, toSet("y"));
|
|
|
+ conf.setCapacityByLabel(B, "y", 100);
|
|
|
+
|
|
|
+ final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
|
|
+ conf.setCapacity(C, 70);
|
|
|
+ conf.setMaximumCapacity(C, 70);
|
|
|
+ conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
|
|
|
+
|
|
|
+ // Define 2nd-level queues
|
|
|
+ final String A1 = A + ".a1";
|
|
|
+ conf.setQueues(A, new String[] {"a1"});
|
|
|
+ conf.setCapacity(A1, 100);
|
|
|
+ conf.setMaximumCapacity(A1, 100);
|
|
|
+ conf.setCapacityByLabel(A1, "x", 100);
|
|
|
+
|
|
|
+ final String B1 = B + ".b1";
|
|
|
+ conf.setQueues(B, new String[] {"b1"});
|
|
|
+ conf.setCapacity(B1, 100);
|
|
|
+ conf.setMaximumCapacity(B1, 100);
|
|
|
+ conf.setCapacityByLabel(B1, "y", 100);
|
|
|
+
|
|
|
+ final String C1 = C + ".c1";
|
|
|
+ conf.setQueues(C, new String[] {"c1"});
|
|
|
+ conf.setCapacity(C1, 100);
|
|
|
+ conf.setMaximumCapacity(C1, 100);
|
|
|
+
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkTaskContainersHost(ApplicationAttemptId attemptId,
|
|
|
+ ContainerId containerId, ResourceManager rm, String host) {
|
|
|
+ YarnScheduler scheduler = rm.getRMContext().getScheduler();
|
|
|
+ SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);
|
|
|
+
|
|
|
+ Assert.assertTrue(appReport.getLiveContainers().size() > 0);
|
|
|
+ for (RMContainer c : appReport.getLiveContainers()) {
|
|
|
+ if (c.getContainerId().equals(containerId)) {
|
|
|
+ Assert.assertEquals(host, c.getAllocatedNode().getHost());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private <E> Set<E> toSet(E... elements) {
|
|
|
+ Set<E> set = Sets.newHashSet(elements);
|
|
|
+ return set;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Test (timeout = 300000)
|
|
|
+ public void testContainerAllocationWithSingleUserLimits() throws Exception {
|
|
|
+ final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
|
|
|
+ mgr.init(conf);
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
|
|
|
+ NodeId.newInstance("h2", 0), toSet("y")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
|
|
|
+ rm1.registerNode("h2:1234", 8000); // label = y
|
|
|
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
|
|
|
+
|
|
|
+ // launch an app to queue a1 (label = x), and check all container will
|
|
|
+ // be allocated in h1
|
|
|
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ // A has only 10% of x, so it can only allocate one container in label=empty
|
|
|
+ ContainerId containerId =
|
|
|
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
|
|
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ // Cannot allocate 2nd label=empty container
|
|
|
+ containerId =
|
|
|
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
|
|
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+
|
|
|
+ // A has default user limit = 100, so it can use all resource in label = x
|
|
|
+ // We can allocate floor(8000 / 1024) = 7 containers
|
|
|
+ for (int id = 3; id <= 8; id++) {
|
|
|
+ containerId =
|
|
|
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), id);
|
|
|
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ }
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 300000)
|
|
|
+ public void testContainerAllocateWithComplexLabels() throws Exception {
|
|
|
+ /*
|
|
|
+ * Queue structure:
|
|
|
+ * root (*)
|
|
|
+ * ________________
|
|
|
+ * / \
|
|
|
+ * a x(100%), y(50%) b y(50%), z(100%)
|
|
|
+ * ________________ ______________
|
|
|
+ * / / \
|
|
|
+ * a1 (x,y) b1(no) b2(y,z)
|
|
|
+ * 100% y = 100%, z = 100%
|
|
|
+ *
|
|
|
+ * Node structure:
|
|
|
+ * h1 : x
|
|
|
+ * h2 : y
|
|
|
+ * h3 : y
|
|
|
+ * h4 : z
|
|
|
+ * h5 : NO
|
|
|
+ *
|
|
|
+ * Total resource:
|
|
|
+ * x: 4G
|
|
|
+ * y: 6G
|
|
|
+ * z: 2G
|
|
|
+ * *: 2G
|
|
|
+ *
|
|
|
+ * Resource of
|
|
|
+ * a1: x=4G, y=3G, NO=0.2G
|
|
|
+ * b1: NO=0.9G (max=1G)
|
|
|
+ * b2: y=3, z=2G, NO=0.9G (max=1G)
|
|
|
+ *
|
|
|
+ * Each node can only allocate two containers
|
|
|
+ */
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
|
|
|
+ toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
|
|
|
+ NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0),
|
|
|
+ toSet("z"), NodeId.newInstance("h5", 0),
|
|
|
+ RMNodeLabelsManager.EMPTY_STRING_SET));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 2048);
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 2048);
|
|
|
+ MockNM nm3 = rm1.registerNode("h3:1234", 2048);
|
|
|
+ MockNM nm4 = rm1.registerNode("h4:1234", 2048);
|
|
|
+ MockNM nm5 = rm1.registerNode("h5:1234", 2048);
|
|
|
+
|
|
|
+ ContainerId containerId;
|
|
|
+
|
|
|
+ // launch an app to queue a1 (label = x), and check all container will
|
|
|
+ // be allocated in h1
|
|
|
+ RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ // request a container (label = y). can be allocated on nm2
|
|
|
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
|
|
|
+ containerId =
|
|
|
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L);
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h2");
|
|
|
+
|
|
|
+ // launch an app to queue b1 (label = y), and check all container will
|
|
|
+ // be allocated in h5
|
|
|
+ RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);
|
|
|
+
|
|
|
+ // request a container for AM, will succeed
|
|
|
+ // and now b1's queue capacity will be used, cannot allocate more containers
|
|
|
+ // (Maximum capacity reached)
|
|
|
+ am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
|
|
+ containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm4, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm5, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+
|
|
|
+ // launch an app to queue b2
|
|
|
+ RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
|
|
|
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);
|
|
|
+
|
|
|
+ // request a container. try to allocate on nm1 (label = x) and nm3 (label =
|
|
|
+ // y,z). Will successfully allocate on nm3
|
|
|
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
|
|
|
+ containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h3");
|
|
|
+
|
|
|
+ // try to allocate container (request label = z) on nm4 (label = y,z).
|
|
|
+ // Will successfully allocate on nm4 only.
|
|
|
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z");
|
|
|
+ containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L);
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm4, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h4");
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout = 120000)
|
|
|
+ public void testContainerAllocateWithLabels() throws Exception {
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
|
|
|
+ NodeId.newInstance("h2", 0), toSet("y")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
|
|
|
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
|
|
|
+
|
|
|
+ ContainerId containerId;
|
|
|
+
|
|
|
+ // launch an app to queue a1 (label = x), and check all container will
|
|
|
+ // be allocated in h1
|
|
|
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3);
|
|
|
+
|
|
|
+ // request a container.
|
|
|
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
|
|
|
+ containerId =
|
|
|
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h1");
|
|
|
+
|
|
|
+ // launch an app to queue b1 (label = y), and check all container will
|
|
|
+ // be allocated in h2
|
|
|
+ RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
|
|
|
+
|
|
|
+ // request a container.
|
|
|
+ am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
|
|
|
+ containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h2");
|
|
|
+
|
|
|
+ // launch an app to queue c1 (label = ""), and check all container will
|
|
|
+ // be allocated in h3
|
|
|
+ RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
|
|
|
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
|
|
|
+
|
|
|
+ // request a container.
|
|
|
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
|
|
+ containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h3");
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout = 120000)
|
|
|
+ public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
|
|
|
+ // This test is pretty much similar to testContainerAllocateWithLabel.
|
|
|
+ // Difference is, this test doesn't specify label expression in ResourceRequest,
|
|
|
+ // instead, it uses default queue label expression
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
|
|
|
+ NodeId.newInstance("h2", 0), toSet("y")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
|
|
|
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
|
|
|
+
|
|
|
+ ContainerId containerId;
|
|
|
+
|
|
|
+ // launch an app to queue a1 (label = x), and check all container will
|
|
|
+ // be allocated in h1
|
|
|
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ // request a container.
|
|
|
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
|
|
+ containerId =
|
|
|
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h1");
|
|
|
+
|
|
|
+ // launch an app to queue b1 (label = y), and check all container will
|
|
|
+ // be allocated in h2
|
|
|
+ RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
|
|
+
|
|
|
+ // request a container.
|
|
|
+ am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
|
|
+ containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h2");
|
|
|
+
|
|
|
+ // launch an app to queue c1 (label = ""), and check all container will
|
|
|
+ // be allocated in h3
|
|
|
+ RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
|
|
|
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
|
|
|
+
|
|
|
+ // request a container.
|
|
|
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
|
|
+ containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h3");
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkPendingResource(MockRM rm, int priority,
|
|
|
+ ApplicationAttemptId attemptId, int memory) {
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
|
|
|
+ FiCaSchedulerApp app = cs.getApplicationAttempt(attemptId);
|
|
|
+ ResourceRequest rr =
|
|
|
+ app.getAppSchedulingInfo().getResourceRequest(
|
|
|
+ Priority.newInstance(priority), "*");
|
|
|
+ Assert.assertEquals(memory,
|
|
|
+ rr.getCapability().getMemory() * rr.getNumContainers());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId,
|
|
|
+ int numContainers) {
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
|
|
|
+ SchedulerNode node = cs.getSchedulerNode(nodeId);
|
|
|
+ Assert.assertEquals(numContainers, node.getNumContainers());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testPreferenceOfNeedyAppsTowardsNodePartitions() throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: Submit two application to a queue (app1 first then app2), app1
|
|
|
+ * asked for no-label, app2 asked for label=x, when node1 has label=x
|
|
|
+ * doing heart beat, app2 will get allocation first, even if app2 submits later
|
|
|
+ * than app1
|
|
|
+ */
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
|
|
|
+ // Makes y to be non-exclusive node labels
|
|
|
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("y", false)));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
|
|
|
+
|
|
|
+ // launch an app to queue b1 (label = y), AM container should be launched in nm2
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
|
|
|
+
|
|
|
+ // launch another app to queue b1 (label = y), AM container should be launched in nm2
|
|
|
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
|
|
+
|
|
|
+ // request container and nm1 do heartbeat (nm2 has label=y), note that app1
|
|
|
+ // request non-labeled container, and app2 request labeled container, app2
|
|
|
+ // will get allocated first even if app1 submitted first.
|
|
|
+ am1.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>());
|
|
|
+ am2.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>(), "y");
|
|
|
+
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
+
|
|
|
+ // Do node heartbeats many times
|
|
|
+ for (int i = 0; i < 50; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+ }
|
|
|
+
|
|
|
+ // App2 will get preference to be allocated on node1, and node1 will be all
|
|
|
+ // used by App2.
|
|
|
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
|
|
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(am2.getApplicationAttemptId());
|
|
|
+ // app1 get nothing in nm1 (partition=y)
|
|
|
+ checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(), schedulerApp1);
|
|
|
+ checkNumOfContainersInAnAppOnGivenNode(9, nm2.getNodeId(), schedulerApp1);
|
|
|
+ // app2 get all resource in nm1 (partition=y)
|
|
|
+ checkNumOfContainersInAnAppOnGivenNode(8, nm1.getNodeId(), schedulerApp2);
|
|
|
+ checkNumOfContainersInAnAppOnGivenNode(1, nm2.getNodeId(), schedulerApp2);
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkNumOfContainersInAnAppOnGivenNode(int expectedNum,
|
|
|
+ NodeId nodeId, FiCaSchedulerApp app) {
|
|
|
+ int num = 0;
|
|
|
+ for (RMContainer container : app.getLiveContainers()) {
|
|
|
+ if (container.getAllocatedNode().equals(nodeId)) {
|
|
|
+ num++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Assert.assertEquals(expectedNum, num);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void
|
|
|
+ testPreferenceOfNeedyPrioritiesUnderSameAppTowardsNodePartitions()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: Submit one application, it asks label="" in priority=1 and
|
|
|
+ * label="x" in priority=2, when a node with label=x heartbeat, priority=2
|
|
|
+ * will get allocation first even if there're pending resource in priority=1
|
|
|
+ */
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
|
|
|
+ // Makes y to be non-exclusive node labels
|
|
|
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("y", false)));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
|
|
|
+
|
|
|
+ ContainerId nextContainerId;
|
|
|
+
|
|
|
+ // launch an app to queue b1 (label = y), AM container should be launched in nm3
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
|
|
|
+
|
|
|
+ // request containers from am2, priority=1 asks for "" and priority=2 asks
|
|
|
+ // for "y", "y" container should be allocated first
|
|
|
+ nextContainerId =
|
|
|
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
|
|
+ am1.allocate("*", 1 * GB, 1, 1, new ArrayList<ContainerId>(), "");
|
|
|
+ am1.allocate("*", 1 * GB, 1, 2, new ArrayList<ContainerId>(), "y");
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm1, nextContainerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+
|
|
|
+ // Check pending resource for am2, priority=1 doesn't get allocated before
|
|
|
+ // priority=2 allocated
|
|
|
+ checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 1 * GB);
|
|
|
+ checkPendingResource(rm1, 2, am1.getApplicationAttemptId(), 0 * GB);
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testNonLabeledResourceRequestGetPreferrenceToNonLabeledNode()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: Submit one application, it asks 6 label="" containers, NM1
|
|
|
+ * with label=y and NM2 has no label, NM1/NM2 doing heartbeat together. Even
|
|
|
+ * if NM1 has idle resource, containers are all allocated to NM2 since
|
|
|
+ * non-labeled request should get allocation on non-labeled nodes first.
|
|
|
+ */
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
|
|
|
+ // Makes x to be non-exclusive node labels
|
|
|
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
|
|
|
+
|
|
|
+ ContainerId nextContainerId;
|
|
|
+
|
|
|
+ // launch an app to queue b1 (label = y), AM container should be launched in nm3
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
|
|
|
+
|
|
|
+ // request containers from am2, priority=1 asks for "" * 6 (id from 4 to 9),
|
|
|
+ // nm2/nm3 do
|
|
|
+ // heartbeat at the same time, check containers are always allocated to nm3.
|
|
|
+ // This is to verify when there's resource available in non-labeled
|
|
|
+ // partition, non-labeled resource should allocate to non-labeled partition
|
|
|
+ // first.
|
|
|
+ am1.allocate("*", 1 * GB, 6, 1, new ArrayList<ContainerId>(), "");
|
|
|
+ for (int i = 2; i < 2 + 6; i++) {
|
|
|
+ nextContainerId =
|
|
|
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), i);
|
|
|
+ Assert.assertTrue(rm1.waitForState(Arrays.asList(nm1, nm2),
|
|
|
+ nextContainerId, RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ }
|
|
|
+ // no more container allocated on nm1
|
|
|
+ checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 0);
|
|
|
+ // all 7 (1 AM container + 6 task container) containers allocated on nm2
|
|
|
+ checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 7);
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testPreferenceOfQueuesTowardsNodePartitions()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: have a following queue structure:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / | \
|
|
|
+ * a b c
|
|
|
+ * / \ / \ / \
|
|
|
+ * a1 a2 b1 b2 c1 c2
|
|
|
+ * (x) (x) (x)
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Only a1, b1, c1 can access label=x, and their default label=x Each each
|
|
|
+ * has one application, asks for 5 containers. NM1 has label=x
|
|
|
+ *
|
|
|
+ * NM1/NM2 doing heartbeat for 15 times, it should allocate all 15
|
|
|
+ * containers with label=x
|
|
|
+ */
|
|
|
+
|
|
|
+ CapacitySchedulerConfiguration csConf =
|
|
|
+ new CapacitySchedulerConfiguration(this.conf);
|
|
|
+
|
|
|
+ // Define top-level queues
|
|
|
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
|
|
|
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
|
|
+
|
|
|
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
|
|
+ csConf.setCapacity(A, 33);
|
|
|
+ csConf.setAccessibleNodeLabels(A, toSet("x"));
|
|
|
+ csConf.setCapacityByLabel(A, "x", 33);
|
|
|
+ csConf.setQueues(A, new String[] {"a1", "a2"});
|
|
|
+
|
|
|
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
|
|
+ csConf.setCapacity(B, 33);
|
|
|
+ csConf.setAccessibleNodeLabels(B, toSet("x"));
|
|
|
+ csConf.setCapacityByLabel(B, "x", 33);
|
|
|
+ csConf.setQueues(B, new String[] {"b1", "b2"});
|
|
|
+
|
|
|
+ final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
|
|
+ csConf.setCapacity(C, 34);
|
|
|
+ csConf.setAccessibleNodeLabels(C, toSet("x"));
|
|
|
+ csConf.setCapacityByLabel(C, "x", 34);
|
|
|
+ csConf.setQueues(C, new String[] {"c1", "c2"});
|
|
|
+
|
|
|
+ // Define 2nd-level queues
|
|
|
+ final String A1 = A + ".a1";
|
|
|
+ csConf.setCapacity(A1, 50);
|
|
|
+ csConf.setCapacityByLabel(A1, "x", 100);
|
|
|
+ csConf.setDefaultNodeLabelExpression(A1, "x");
|
|
|
+
|
|
|
+ final String A2 = A + ".a2";
|
|
|
+ csConf.setCapacity(A2, 50);
|
|
|
+ csConf.setCapacityByLabel(A2, "x", 0);
|
|
|
+
|
|
|
+ final String B1 = B + ".b1";
|
|
|
+ csConf.setCapacity(B1, 50);
|
|
|
+ csConf.setCapacityByLabel(B1, "x", 100);
|
|
|
+ csConf.setDefaultNodeLabelExpression(B1, "x");
|
|
|
+
|
|
|
+ final String B2 = B + ".b2";
|
|
|
+ csConf.setCapacity(B2, 50);
|
|
|
+ csConf.setCapacityByLabel(B2, "x", 0);
|
|
|
+
|
|
|
+ final String C1 = C + ".c1";
|
|
|
+ csConf.setCapacity(C1, 50);
|
|
|
+ csConf.setCapacityByLabel(C1, "x", 100);
|
|
|
+ csConf.setDefaultNodeLabelExpression(C1, "x");
|
|
|
+
|
|
|
+ final String C2 = C + ".c2";
|
|
|
+ csConf.setCapacity(C2, 50);
|
|
|
+ csConf.setCapacityByLabel(C2, "x", 0);
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
|
|
|
+ // Makes x to be non-exclusive node labels
|
|
|
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(csConf) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); // label = x
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
|
|
|
+
|
|
|
+ // app1 -> a1
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ // app2 -> a2
|
|
|
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a2");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
|
|
+
|
|
|
+ // app3 -> b1
|
|
|
+ RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
|
|
|
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
|
|
|
+
|
|
|
+ // app4 -> b2
|
|
|
+ RMApp app4 = rm1.submitApp(1 * GB, "app", "user", null, "b2");
|
|
|
+ MockAM am4 = MockRM.launchAndRegisterAM(app4, rm1, nm2);
|
|
|
+
|
|
|
+ // app5 -> c1
|
|
|
+ RMApp app5 = rm1.submitApp(1 * GB, "app", "user", null, "c1");
|
|
|
+ MockAM am5 = MockRM.launchAndRegisterAM(app5, rm1, nm1);
|
|
|
+
|
|
|
+ // app6 -> b2
|
|
|
+ RMApp app6 = rm1.submitApp(1 * GB, "app", "user", null, "c2");
|
|
|
+ MockAM am6 = MockRM.launchAndRegisterAM(app6, rm1, nm2);
|
|
|
+
|
|
|
+ // Each application request 5 * 1GB container
|
|
|
+ am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
|
|
|
+ am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
|
|
|
+ am3.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
|
|
|
+ am4.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
|
|
|
+ am5.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
|
|
|
+ am6.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // NM1 do 15 heartbeats
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+ for (int i = 0; i < 15; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ }
|
|
|
+
|
|
|
+ // NM1 get 15 new containers (total is 18, 15 task containers and 3 AM
|
|
|
+ // containers)
|
|
|
+ checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 18);
|
|
|
+
|
|
|
+ // Check pending resource each application
|
|
|
+ // APP1/APP3/APP5 get satisfied, and APP2/APP2/APP3 get nothing.
|
|
|
+ checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 0 * GB);
|
|
|
+ checkPendingResource(rm1, 1, am2.getApplicationAttemptId(), 5 * GB);
|
|
|
+ checkPendingResource(rm1, 1, am3.getApplicationAttemptId(), 0 * GB);
|
|
|
+ checkPendingResource(rm1, 1, am4.getApplicationAttemptId(), 5 * GB);
|
|
|
+ checkPendingResource(rm1, 1, am5.getApplicationAttemptId(), 0 * GB);
|
|
|
+ checkPendingResource(rm1, 1, am6.getApplicationAttemptId(), 5 * GB);
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testQueuesWithoutAccessUsingPartitionedNodes() throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: have a following queue structure:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * (x)
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Only a can access label=x, two nodes in the cluster, n1 has x and n2 has
|
|
|
+ * no-label.
|
|
|
+ *
|
|
|
+ * When user-limit-factor=5, submit one application in queue b and request
|
|
|
+ * for infinite containers should be able to use up all cluster resources.
|
|
|
+ */
|
|
|
+
|
|
|
+ CapacitySchedulerConfiguration csConf =
|
|
|
+ new CapacitySchedulerConfiguration(this.conf);
|
|
|
+
|
|
|
+ // Define top-level queues
|
|
|
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
|
|
|
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
|
|
+
|
|
|
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
|
|
+ csConf.setCapacity(A, 50);
|
|
|
+ csConf.setAccessibleNodeLabels(A, toSet("x"));
|
|
|
+ csConf.setCapacityByLabel(A, "x", 100);
|
|
|
+
|
|
|
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
|
|
+ csConf.setCapacity(B, 50);
|
|
|
+ csConf.setAccessibleNodeLabels(B, new HashSet<String>());
|
|
|
+ csConf.setUserLimitFactor(B, 5);
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
|
|
|
+ // Makes x to be non-exclusive node labels
|
|
|
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(csConf) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty>
|
|
|
+
|
|
|
+ // app1 -> b
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
|
|
|
+
|
|
|
+ // Each application request 5 * 1GB container
|
|
|
+ am1.allocate("*", 1 * GB, 50, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // NM1 do 50 heartbeats
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
+
|
|
|
+ SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
|
|
|
+
|
|
|
+ // How much cycles we waited to be allocated when available resource only on
|
|
|
+ // partitioned node
|
|
|
+ int cycleWaited = 0;
|
|
|
+ for (int i = 0; i < 50; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+ if (schedulerNode1.getNumContainers() == 0) {
|
|
|
+ cycleWaited++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // We will will 10 cycles before get allocated on partitioned node
|
|
|
+ // NM2 can allocate 10 containers totally, exclude already allocated AM
|
|
|
+ // container, we will wait 9 to fulfill non-partitioned node, and need wait
|
|
|
+ // one more cycle before allocating to non-partitioned node
|
|
|
+ Assert.assertEquals(10, cycleWaited);
|
|
|
+
|
|
|
+ // Both NM1/NM2 launched 10 containers, cluster resource is exhausted
|
|
|
+ checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 10);
|
|
|
+ checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 10);
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testAMContainerAllocationWillAlwaysBeExclusive()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: Submit one application without partition, trying to allocate a
|
|
|
+ * node has partition=x, it should fail to allocate since AM container will
|
|
|
+ * always respect exclusivity for partitions
|
|
|
+ */
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
|
|
|
+ // Makes x to be non-exclusive node labels
|
|
|
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
|
|
|
+
|
|
|
+ // launch an app to queue b1 (label = y), AM container should be launched in nm3
|
|
|
+ rm1.submitApp(1 * GB, "app", "user", null, "b1");
|
|
|
+
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+
|
|
|
+ // Heartbeat for many times, app1 should get nothing
|
|
|
+ for (int i = 0; i < 50; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId())
|
|
|
+ .getNumContainers());
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void
|
|
|
+ testQueueMaxCapacitiesWillNotBeHonoredWhenNotRespectingExclusivity()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: have a following queue structure:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * (x) (x)
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * a/b can access x, both of them has max-capacity-on-x = 50
|
|
|
+ *
|
|
|
+ * When doing non-exclusive allocation, app in a (or b) can use 100% of x
|
|
|
+ * resource.
|
|
|
+ */
|
|
|
+
|
|
|
+ CapacitySchedulerConfiguration csConf =
|
|
|
+ new CapacitySchedulerConfiguration(this.conf);
|
|
|
+
|
|
|
+ // Define top-level queues
|
|
|
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
|
|
|
+ "b" });
|
|
|
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
|
|
+
|
|
|
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
|
|
+ csConf.setCapacity(A, 50);
|
|
|
+ csConf.setAccessibleNodeLabels(A, toSet("x"));
|
|
|
+ csConf.setCapacityByLabel(A, "x", 50);
|
|
|
+ csConf.setMaximumCapacityByLabel(A, "x", 50);
|
|
|
+
|
|
|
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
|
|
+ csConf.setCapacity(B, 50);
|
|
|
+ csConf.setAccessibleNodeLabels(B, toSet("x"));
|
|
|
+ csConf.setCapacityByLabel(B, "x", 50);
|
|
|
+ csConf.setMaximumCapacityByLabel(B, "x", 50);
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
|
|
|
+ // Makes x to be non-exclusive node labels
|
|
|
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(csConf) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty>
|
|
|
+
|
|
|
+ // app1 -> a
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
|
|
|
+
|
|
|
+ // app1 asks for 10 partition= containers
|
|
|
+ am1.allocate("*", 1 * GB, 10, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // NM1 do 50 heartbeats
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+
|
|
|
+ SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
|
|
|
+
|
|
|
+ for (int i = 0; i < 50; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ }
|
|
|
+
|
|
|
+ // app1 gets all resource in partition=x
|
|
|
+ Assert.assertEquals(10, schedulerNode1.getNumContainers());
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+}
|