|
@@ -0,0 +1,677 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|
|
+
|
|
|
+import com.google.common.collect.Sets;
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.service.Service;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
+import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.util.Clock;
|
|
|
+import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
+
|
|
|
+public class TestCapacitySchedulerPreemption {
|
|
|
+ private static final Log LOG = LogFactory.getLog(
|
|
|
+ TestCapacitySchedulerPreemption.class);
|
|
|
+
|
|
|
+ private final int GB = 1024;
|
|
|
+
|
|
|
+ private Configuration conf;
|
|
|
+
|
|
|
+ RMNodeLabelsManager mgr;
|
|
|
+
|
|
|
+ Clock clock;
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void setUp() throws Exception {
|
|
|
+ conf = new YarnConfiguration();
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
+ ResourceScheduler.class);
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
|
|
|
+ ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
|
|
|
+ conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
|
|
|
+
|
|
|
+ // Set preemption related configurations
|
|
|
+ conf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL,
|
|
|
+ 0);
|
|
|
+ conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
|
|
|
+ true);
|
|
|
+ conf.setFloat(
|
|
|
+ ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f);
|
|
|
+ conf.setFloat(
|
|
|
+ ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f);
|
|
|
+ mgr = new NullRMNodeLabelsManager();
|
|
|
+ mgr.init(this.conf);
|
|
|
+ clock = mock(Clock.class);
|
|
|
+ when(clock.getTime()).thenReturn(0L);
|
|
|
+ }
|
|
|
+
|
|
|
+ private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
|
|
|
+ RMActiveServices activeServices = rm.getRMActiveService();
|
|
|
+ SchedulingMonitor mon = null;
|
|
|
+ for (Service service : activeServices.getServices()) {
|
|
|
+ if (service instanceof SchedulingMonitor) {
|
|
|
+ mon = (SchedulingMonitor) service;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (mon != null) {
|
|
|
+ return mon.getSchedulingEditPolicy();
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout = 60000)
|
|
|
+ public void testSimplePreemption() throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: Submit two application (app1/app2) to different queues, queue
|
|
|
+ * structure:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * Root
|
|
|
+ * / | \
|
|
|
+ * a b c
|
|
|
+ * 10 20 70
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * 1) Two nodes in the cluster, each of them has 4G.
|
|
|
+ *
|
|
|
+ * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
|
|
|
+ * more resource available.
|
|
|
+ *
|
|
|
+ * 3) app2 submit to queue-c, ask for one 1G container (for AM)
|
|
|
+ *
|
|
|
+ * Now the cluster is fulfilled.
|
|
|
+ *
|
|
|
+ * 4) app2 asks for another 1G container, system will preempt one container
|
|
|
+ * from app1, and app2 will receive the preempted container
|
|
|
+ */
|
|
|
+ MockRM rm1 = new MockRM(conf);
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
+
|
|
|
+ // launch an app to queue, AM container should be launched in nm1
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // Do allocation 3 times for node1/node2
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+ }
|
|
|
+
|
|
|
+ // App1 should have 7 containers now, and no available resource for cluster
|
|
|
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
+ am1.getApplicationAttemptId());
|
|
|
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
+
|
|
|
+ // Submit app2 to queue-c and asks for a 1G container for AM
|
|
|
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
|
|
+
|
|
|
+ // NM1/NM2 has available resource = 0G
|
|
|
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
|
|
|
+ .getAvailableResource().getMemory());
|
|
|
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
|
|
|
+ .getAvailableResource().getMemory());
|
|
|
+
|
|
|
+ // AM asks for a 1 * GB container
|
|
|
+ am2.allocate(Arrays.asList(ResourceRequest
|
|
|
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
|
|
|
+ Resources.createResource(1 * GB), 1)), null);
|
|
|
+
|
|
|
+ // Get edit policy and do one update
|
|
|
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
|
|
|
+
|
|
|
+ // Call edit schedule twice, and check if one container from app1 marked
|
|
|
+ // to be "killable"
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ editPolicy.editSchedule();
|
|
|
+
|
|
|
+ PreemptionManager pm = cs.getPreemptionManager();
|
|
|
+ Map<ContainerId, RMContainer> killableContainers =
|
|
|
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
|
|
|
+ Assert.assertEquals(1, killableContainers.size());
|
|
|
+ Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
|
|
|
+ .getApplicationAttemptId(), am1.getApplicationAttemptId());
|
|
|
+
|
|
|
+ // Call CS.handle once to see if container preempted
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+
|
|
|
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
|
+ am2.getApplicationAttemptId());
|
|
|
+
|
|
|
+ // App1 has 6 containers, and app2 has 2 containers
|
|
|
+ Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout = 60000)
|
|
|
+ public void testPreemptionConsidersNodeLocalityDelay()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: same as testSimplePreemption steps 1-3.
|
|
|
+ *
|
|
|
+ * Step 4: app2 asks for 1G container with locality specified, so it needs
|
|
|
+ * to wait for missed-opportunity before get scheduled.
|
|
|
+ * Check if system waits missed-opportunity before finish killable container
|
|
|
+ */
|
|
|
+ MockRM rm1 = new MockRM(conf);
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
+
|
|
|
+ // launch an app to queue, AM container should be launched in nm1
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // Do allocation 3 times for node1/node2
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+ }
|
|
|
+
|
|
|
+ // App1 should have 7 containers now, and no available resource for cluster
|
|
|
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
+ am1.getApplicationAttemptId());
|
|
|
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
+
|
|
|
+ // Submit app2 to queue-c and asks for a 1G container for AM
|
|
|
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
|
|
+
|
|
|
+ // NM1/NM2 has available resource = 0G
|
|
|
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
|
|
|
+ .getAvailableResource().getMemory());
|
|
|
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
|
|
|
+ .getAvailableResource().getMemory());
|
|
|
+
|
|
|
+ // AM asks for a 1 * GB container with unknown host and unknown rack
|
|
|
+ am2.allocate(Arrays.asList(ResourceRequest
|
|
|
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
|
|
|
+ Resources.createResource(1 * GB), 1), ResourceRequest
|
|
|
+ .newInstance(Priority.newInstance(1), "unknownhost",
|
|
|
+ Resources.createResource(1 * GB), 1), ResourceRequest
|
|
|
+ .newInstance(Priority.newInstance(1), "/default-rack",
|
|
|
+ Resources.createResource(1 * GB), 1)), null);
|
|
|
+
|
|
|
+ // Get edit policy and do one update
|
|
|
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
|
|
|
+
|
|
|
+ // Call edit schedule twice, and check if one container from app1 marked
|
|
|
+ // to be "killable"
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ editPolicy.editSchedule();
|
|
|
+
|
|
|
+ PreemptionManager pm = cs.getPreemptionManager();
|
|
|
+ Map<ContainerId, RMContainer> killableContainers =
|
|
|
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
|
|
|
+ Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
|
|
|
+ .getApplicationAttemptId(), am1.getApplicationAttemptId());
|
|
|
+
|
|
|
+ // Call CS.handle once to see if container preempted
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+
|
|
|
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
|
+ am2.getApplicationAttemptId());
|
|
|
+
|
|
|
+ // App1 has 7 containers, and app2 has 1 containers (no container preempted)
|
|
|
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
|
|
+
|
|
|
+ // Do allocation again, one container will be preempted
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+
|
|
|
+ // App1 has 6 containers, and app2 has 2 containers (new container allocated)
|
|
|
+ Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout = 60000)
|
|
|
+ public void testPreemptionConsidersHardNodeLocality()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: same as testSimplePreemption steps 1-3.
|
|
|
+ *
|
|
|
+ * Step 4: app2 asks for 1G container with hard locality specified, and
|
|
|
+ * asked host is not existed
|
|
|
+ * Confirm system doesn't preempt any container.
|
|
|
+ */
|
|
|
+ MockRM rm1 = new MockRM(conf);
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
+
|
|
|
+ // launch an app to queue, AM container should be launched in nm1
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // Do allocation 3 times for node1/node2
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ }
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+ }
|
|
|
+
|
|
|
+ // App1 should have 7 containers now, and no available resource for cluster
|
|
|
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
+ am1.getApplicationAttemptId());
|
|
|
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
+
|
|
|
+ // Submit app2 to queue-c and asks for a 1G container for AM
|
|
|
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
|
|
+
|
|
|
+ // NM1/NM2 has available resource = 0G
|
|
|
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
|
|
|
+ .getAvailableResource().getMemory());
|
|
|
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
|
|
|
+ .getAvailableResource().getMemory());
|
|
|
+
|
|
|
+ // AM asks for a 1 * GB container for h3 with hard locality,
|
|
|
+ // h3 doesn't exist in the cluster
|
|
|
+ am2.allocate(Arrays.asList(ResourceRequest
|
|
|
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
|
|
|
+ Resources.createResource(1 * GB), 1, true), ResourceRequest
|
|
|
+ .newInstance(Priority.newInstance(1), "h3",
|
|
|
+ Resources.createResource(1 * GB), 1, false), ResourceRequest
|
|
|
+ .newInstance(Priority.newInstance(1), "/default-rack",
|
|
|
+ Resources.createResource(1 * GB), 1, false)), null);
|
|
|
+
|
|
|
+ // Get edit policy and do one update
|
|
|
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
|
|
|
+
|
|
|
+ // Call edit schedule twice, and check if one container from app1 marked
|
|
|
+ // to be "killable"
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ editPolicy.editSchedule();
|
|
|
+
|
|
|
+ PreemptionManager pm = cs.getPreemptionManager();
|
|
|
+ Map<ContainerId, RMContainer> killableContainers =
|
|
|
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
|
|
|
+ Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
|
|
|
+ .getApplicationAttemptId(), am1.getApplicationAttemptId());
|
|
|
+
|
|
|
+ // Call CS.handle once to see if container preempted
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+
|
|
|
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
|
+ am2.getApplicationAttemptId());
|
|
|
+
|
|
|
+ // App1 has 7 containers, and app2 has 1 containers (no container preempted)
|
|
|
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
|
|
+
|
|
|
+ // Do allocation again, nothing will be preempted
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+
|
|
|
+ // App1 has 7 containers, and app2 has 1 containers (no container allocated)
|
|
|
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout = 60000)
|
|
|
+ public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case:
|
|
|
+ * <pre>
|
|
|
+ * Root
|
|
|
+ * / | \
|
|
|
+ * a b c
|
|
|
+ * 10 20 70
|
|
|
+ * </pre>
|
|
|
+ * Submit applications to two queues, one uses more than the other, so
|
|
|
+ * preemption will happen.
|
|
|
+ *
|
|
|
+ * Check:
|
|
|
+ * 1) Killable containers resources will be excluded from PCPP (no duplicated
|
|
|
+ * container added to killable list)
|
|
|
+ * 2) When more resources need to be preempted, new containers will be selected
|
|
|
+ * and killable containers will be considered
|
|
|
+ */
|
|
|
+ MockRM rm1 = new MockRM(conf);
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+
|
|
|
+ // launch an app to queue, AM container should be launched in nm1
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // Do allocation 6 times for node1
|
|
|
+ for (int i = 0; i < 6; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ }
|
|
|
+
|
|
|
+ // App1 should have 7 containers now, and no available resource for cluster
|
|
|
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
+ am1.getApplicationAttemptId());
|
|
|
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
+
|
|
|
+ // Submit app2 to queue-c and asks for a 1G container for AM
|
|
|
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
|
|
+
|
|
|
+ // NM1 has available resource = 0G
|
|
|
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
|
|
|
+ .getAvailableResource().getMemory());
|
|
|
+ am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // Get edit policy and do one update
|
|
|
+ ProportionalCapacityPreemptionPolicy editPolicy =
|
|
|
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
|
|
|
+
|
|
|
+ // Call edit schedule twice, and check if one container from app1 marked
|
|
|
+ // to be "killable"
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ editPolicy.editSchedule();
|
|
|
+
|
|
|
+ PreemptionManager pm = cs.getPreemptionManager();
|
|
|
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
|
|
|
+
|
|
|
+ // Check killable containers and to-be-preempted containers in edit policy
|
|
|
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
|
|
|
+
|
|
|
+ // Run edit schedule again, confirm status doesn't changed
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
|
|
|
+
|
|
|
+ // Save current to kill containers
|
|
|
+ Set<ContainerId> previousKillableContainers = new HashSet<>(
|
|
|
+ pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
|
|
|
+ .keySet());
|
|
|
+
|
|
|
+ // Update request resource of c from 1 to 2, so we need to preempt
|
|
|
+ // one more container
|
|
|
+ am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
|
|
|
+ // and 1 container in killable map
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
|
|
|
+
|
|
|
+ // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
|
|
|
+
|
|
|
+ // Check if previous killable containers included by new killable containers
|
|
|
+ Map<ContainerId, RMContainer> killableContainers =
|
|
|
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
|
|
|
+ Assert.assertTrue(
|
|
|
+ Sets.difference(previousKillableContainers, killableContainers.keySet())
|
|
|
+ .isEmpty());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout = 60000)
|
|
|
+ public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case:
|
|
|
+ * <pre>
|
|
|
+ * Root
|
|
|
+ * / | \
|
|
|
+ * a b c
|
|
|
+ * 10 20 70
|
|
|
+ * </pre>
|
|
|
+ * Submit applications to two queues, one uses more than the other, so
|
|
|
+ * preemption will happen.
|
|
|
+ *
|
|
|
+ * Check:
|
|
|
+ * 1) Containers will be marked to killable
|
|
|
+ * 2) Cancel resource request
|
|
|
+ * 3) Killable containers will be cancelled from policy and scheduler
|
|
|
+ */
|
|
|
+ MockRM rm1 = new MockRM(conf);
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+
|
|
|
+ // launch an app to queue, AM container should be launched in nm1
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // Do allocation 6 times for node1
|
|
|
+ for (int i = 0; i < 6; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ }
|
|
|
+
|
|
|
+ // App1 should have 7 containers now, and no available resource for cluster
|
|
|
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
+ am1.getApplicationAttemptId());
|
|
|
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
+
|
|
|
+ // Submit app2 to queue-c and asks for a 1G container for AM
|
|
|
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
|
|
+
|
|
|
+ // NM1 has available resource = 0G
|
|
|
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
|
|
|
+ .getAvailableResource().getMemory());
|
|
|
+ am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // Get edit policy and do one update
|
|
|
+ ProportionalCapacityPreemptionPolicy editPolicy =
|
|
|
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
|
|
|
+
|
|
|
+ // Call edit schedule twice, and check if 3 container from app1 marked
|
|
|
+ // to be "killable"
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ editPolicy.editSchedule();
|
|
|
+
|
|
|
+ PreemptionManager pm = cs.getPreemptionManager();
|
|
|
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
|
|
|
+
|
|
|
+ // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
|
|
|
+ am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
|
|
|
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
|
|
|
+
|
|
|
+ // Call editSchedule once more to make sure still nothing happens
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
|
|
|
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout = 60000)
|
|
|
+ public void testPreemptionConsidersUserLimit()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: Submit two application (app1/app2) to different queues, queue
|
|
|
+ * structure:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * Root
|
|
|
+ * / | \
|
|
|
+ * a b c
|
|
|
+ * 10 20 70
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
|
|
|
+ *
|
|
|
+ * 1) Two nodes in the cluster, each of them has 4G.
|
|
|
+ *
|
|
|
+ * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
|
|
|
+ * more resource available.
|
|
|
+ *
|
|
|
+ * 3) app2 submit to queue-c, ask for one 1G container (for AM)
|
|
|
+ *
|
|
|
+ * Now the cluster is fulfilled.
|
|
|
+ *
|
|
|
+ * 4) app2 asks for another 1G container, system will preempt one container
|
|
|
+ * from app1, and app2 will receive the preempted container
|
|
|
+ */
|
|
|
+ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
|
|
|
+ csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
|
|
|
+ MockRM rm1 = new MockRM(csConf);
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
+
|
|
|
+ // launch an app to queue, AM container should be launched in nm1
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // Do allocation 3 times for node1/node2
|
|
|
+ for (int i = 0; i < 3; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+ }
|
|
|
+
|
|
|
+ // App1 should have 7 containers now, and no available resource for cluster
|
|
|
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
+ am1.getApplicationAttemptId());
|
|
|
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
+
|
|
|
+ // Submit app2 to queue-c and asks for a 1G container for AM
|
|
|
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
|
|
+
|
|
|
+ // NM1/NM2 has available resource = 0G
|
|
|
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
|
|
|
+ .getAvailableResource().getMemory());
|
|
|
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
|
|
|
+ .getAvailableResource().getMemory());
|
|
|
+
|
|
|
+ // AM asks for a 1 * GB container
|
|
|
+ am2.allocate(Arrays.asList(ResourceRequest
|
|
|
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
|
|
|
+ Resources.createResource(1 * GB), 1)), null);
|
|
|
+
|
|
|
+ // Get edit policy and do one update
|
|
|
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
|
|
|
+
|
|
|
+ // Call edit schedule twice, and check if no container from app1 marked
|
|
|
+ // to be "killable"
|
|
|
+ editPolicy.editSchedule();
|
|
|
+ editPolicy.editSchedule();
|
|
|
+
|
|
|
+ // No preemption happens
|
|
|
+ PreemptionManager pm = cs.getPreemptionManager();
|
|
|
+ Map<ContainerId, RMContainer> killableContainers =
|
|
|
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
|
|
|
+ Assert.assertEquals(0, killableContainers.size());
|
|
|
+
|
|
|
+ // Call CS.handle once to see if container preempted
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+
|
|
|
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
|
+ am2.getApplicationAttemptId());
|
|
|
+
|
|
|
+ // App1 has 7 containers, and app2 has 1 containers (nothing preempted)
|
|
|
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<ContainerId, RMContainer> waitKillableContainersSize(
|
|
|
+ PreemptionManager pm, String queueName, String partition,
|
|
|
+ int expectedSize) throws InterruptedException {
|
|
|
+ Map<ContainerId, RMContainer> killableContainers =
|
|
|
+ pm.getKillableContainersMap(queueName, partition);
|
|
|
+
|
|
|
+ int wait = 0;
|
|
|
+ // Wait for at most 5 sec (it should be super fast actually)
|
|
|
+ while (expectedSize != killableContainers.size() && wait < 500) {
|
|
|
+ killableContainers = pm.getKillableContainersMap(queueName, partition);
|
|
|
+ Thread.sleep(10);
|
|
|
+ wait++;
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertEquals(expectedSize, killableContainers.size());
|
|
|
+ return killableContainers;
|
|
|
+ }
|
|
|
+}
|