|
@@ -1,677 +0,0 @@
|
|
-/**
|
|
|
|
- * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
- * or more contributor license agreements. See the NOTICE file
|
|
|
|
- * distributed with this work for additional information
|
|
|
|
- * regarding copyright ownership. The ASF licenses this file
|
|
|
|
- * to you under the Apache License, Version 2.0 (the
|
|
|
|
- * "License"); you may not use this file except in compliance
|
|
|
|
- * with the License. You may obtain a copy of the License at
|
|
|
|
- *
|
|
|
|
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
- *
|
|
|
|
- * Unless required by applicable law or agreed to in writing, software
|
|
|
|
- * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
- * See the License for the specific language governing permissions and
|
|
|
|
- * limitations under the License.
|
|
|
|
- */
|
|
|
|
-
|
|
|
|
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|
|
|
-
|
|
|
|
-import com.google.common.collect.Sets;
|
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
|
-import org.apache.hadoop.service.Service;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
|
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
|
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
|
-import org.apache.hadoop.yarn.util.Clock;
|
|
|
|
-import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
|
-import org.junit.Assert;
|
|
|
|
-import org.junit.Before;
|
|
|
|
-import org.junit.Test;
|
|
|
|
-
|
|
|
|
-import java.util.ArrayList;
|
|
|
|
-import java.util.Arrays;
|
|
|
|
-import java.util.HashSet;
|
|
|
|
-import java.util.Map;
|
|
|
|
-import java.util.Set;
|
|
|
|
-
|
|
|
|
-import static org.mockito.Mockito.mock;
|
|
|
|
-import static org.mockito.Mockito.when;
|
|
|
|
-
|
|
|
|
-public class TestCapacitySchedulerPreemption {
|
|
|
|
- private static final Log LOG = LogFactory.getLog(
|
|
|
|
- TestCapacitySchedulerPreemption.class);
|
|
|
|
-
|
|
|
|
- private final int GB = 1024;
|
|
|
|
-
|
|
|
|
- private Configuration conf;
|
|
|
|
-
|
|
|
|
- RMNodeLabelsManager mgr;
|
|
|
|
-
|
|
|
|
- Clock clock;
|
|
|
|
-
|
|
|
|
- @Before
|
|
|
|
- public void setUp() throws Exception {
|
|
|
|
- conf = new YarnConfiguration();
|
|
|
|
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
|
- ResourceScheduler.class);
|
|
|
|
- conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
|
|
|
|
- conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
|
|
|
|
- ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
|
|
|
|
- conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
|
|
|
|
-
|
|
|
|
- // Set preemption related configurations
|
|
|
|
- conf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL,
|
|
|
|
- 0);
|
|
|
|
- conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
|
|
|
|
- true);
|
|
|
|
- conf.setFloat(
|
|
|
|
- ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f);
|
|
|
|
- conf.setFloat(
|
|
|
|
- ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f);
|
|
|
|
- mgr = new NullRMNodeLabelsManager();
|
|
|
|
- mgr.init(this.conf);
|
|
|
|
- clock = mock(Clock.class);
|
|
|
|
- when(clock.getTime()).thenReturn(0L);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
|
|
|
|
- RMActiveServices activeServices = rm.getRMActiveService();
|
|
|
|
- SchedulingMonitor mon = null;
|
|
|
|
- for (Service service : activeServices.getServices()) {
|
|
|
|
- if (service instanceof SchedulingMonitor) {
|
|
|
|
- mon = (SchedulingMonitor) service;
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (mon != null) {
|
|
|
|
- return mon.getSchedulingEditPolicy();
|
|
|
|
- }
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Test (timeout = 60000)
|
|
|
|
- public void testSimplePreemption() throws Exception {
|
|
|
|
- /**
|
|
|
|
- * Test case: Submit two application (app1/app2) to different queues, queue
|
|
|
|
- * structure:
|
|
|
|
- *
|
|
|
|
- * <pre>
|
|
|
|
- * Root
|
|
|
|
- * / | \
|
|
|
|
- * a b c
|
|
|
|
- * 10 20 70
|
|
|
|
- * </pre>
|
|
|
|
- *
|
|
|
|
- * 1) Two nodes in the cluster, each of them has 4G.
|
|
|
|
- *
|
|
|
|
- * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
|
|
|
|
- * more resource available.
|
|
|
|
- *
|
|
|
|
- * 3) app2 submit to queue-c, ask for one 1G container (for AM)
|
|
|
|
- *
|
|
|
|
- * Now the cluster is fulfilled.
|
|
|
|
- *
|
|
|
|
- * 4) app2 asks for another 1G container, system will preempt one container
|
|
|
|
- * from app1, and app2 will receive the preempted container
|
|
|
|
- */
|
|
|
|
- MockRM rm1 = new MockRM(conf);
|
|
|
|
- rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
|
- rm1.start();
|
|
|
|
-
|
|
|
|
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
|
|
|
|
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
|
|
|
|
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
|
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
|
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
|
-
|
|
|
|
- // launch an app to queue, AM container should be launched in nm1
|
|
|
|
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
|
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
|
-
|
|
|
|
- am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
|
|
|
|
-
|
|
|
|
- // Do allocation 3 times for node1/node2
|
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // App1 should have 7 containers now, and no available resource for cluster
|
|
|
|
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
|
- am1.getApplicationAttemptId());
|
|
|
|
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
- // Submit app2 to queue-c and asks for a 1G container for AM
|
|
|
|
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
|
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
|
|
|
-
|
|
|
|
- // NM1/NM2 has available resource = 0G
|
|
|
|
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
|
|
|
|
- .getUnallocatedResource().getMemory());
|
|
|
|
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
|
|
|
|
- .getUnallocatedResource().getMemory());
|
|
|
|
-
|
|
|
|
- // AM asks for a 1 * GB container
|
|
|
|
- am2.allocate(Arrays.asList(ResourceRequest
|
|
|
|
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
|
|
|
|
- Resources.createResource(1 * GB), 1)), null);
|
|
|
|
-
|
|
|
|
- // Get edit policy and do one update
|
|
|
|
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
|
|
|
|
-
|
|
|
|
- // Call edit schedule twice, and check if one container from app1 marked
|
|
|
|
- // to be "killable"
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
-
|
|
|
|
- PreemptionManager pm = cs.getPreemptionManager();
|
|
|
|
- Map<ContainerId, RMContainer> killableContainers =
|
|
|
|
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
|
|
|
|
- Assert.assertEquals(1, killableContainers.size());
|
|
|
|
- Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
|
|
|
|
- .getApplicationAttemptId(), am1.getApplicationAttemptId());
|
|
|
|
-
|
|
|
|
- // Call CS.handle once to see if container preempted
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
|
-
|
|
|
|
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
|
|
- am2.getApplicationAttemptId());
|
|
|
|
-
|
|
|
|
- // App1 has 6 containers, and app2 has 2 containers
|
|
|
|
- Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
|
|
|
|
- Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
- rm1.close();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Test (timeout = 60000)
|
|
|
|
- public void testPreemptionConsidersNodeLocalityDelay()
|
|
|
|
- throws Exception {
|
|
|
|
- /**
|
|
|
|
- * Test case: same as testSimplePreemption steps 1-3.
|
|
|
|
- *
|
|
|
|
- * Step 4: app2 asks for 1G container with locality specified, so it needs
|
|
|
|
- * to wait for missed-opportunity before get scheduled.
|
|
|
|
- * Check if system waits missed-opportunity before finish killable container
|
|
|
|
- */
|
|
|
|
- MockRM rm1 = new MockRM(conf);
|
|
|
|
- rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
|
- rm1.start();
|
|
|
|
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
|
|
|
|
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
|
|
|
|
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
|
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
|
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
|
-
|
|
|
|
- // launch an app to queue, AM container should be launched in nm1
|
|
|
|
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
|
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
|
-
|
|
|
|
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
|
|
|
|
-
|
|
|
|
- // Do allocation 3 times for node1/node2
|
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // App1 should have 7 containers now, and no available resource for cluster
|
|
|
|
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
|
- am1.getApplicationAttemptId());
|
|
|
|
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
- // Submit app2 to queue-c and asks for a 1G container for AM
|
|
|
|
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
|
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
|
|
|
-
|
|
|
|
- // NM1/NM2 has available resource = 0G
|
|
|
|
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
|
|
|
|
- .getUnallocatedResource().getMemory());
|
|
|
|
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
|
|
|
|
- .getUnallocatedResource().getMemory());
|
|
|
|
-
|
|
|
|
- // AM asks for a 1 * GB container with unknown host and unknown rack
|
|
|
|
- am2.allocate(Arrays.asList(ResourceRequest
|
|
|
|
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
|
|
|
|
- Resources.createResource(1 * GB), 1), ResourceRequest
|
|
|
|
- .newInstance(Priority.newInstance(1), "unknownhost",
|
|
|
|
- Resources.createResource(1 * GB), 1), ResourceRequest
|
|
|
|
- .newInstance(Priority.newInstance(1), "/default-rack",
|
|
|
|
- Resources.createResource(1 * GB), 1)), null);
|
|
|
|
-
|
|
|
|
- // Get edit policy and do one update
|
|
|
|
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
|
|
|
|
-
|
|
|
|
- // Call edit schedule twice, and check if one container from app1 marked
|
|
|
|
- // to be "killable"
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
-
|
|
|
|
- PreemptionManager pm = cs.getPreemptionManager();
|
|
|
|
- Map<ContainerId, RMContainer> killableContainers =
|
|
|
|
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
|
|
|
|
- Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
|
|
|
|
- .getApplicationAttemptId(), am1.getApplicationAttemptId());
|
|
|
|
-
|
|
|
|
- // Call CS.handle once to see if container preempted
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
|
-
|
|
|
|
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
|
|
- am2.getApplicationAttemptId());
|
|
|
|
-
|
|
|
|
- // App1 has 7 containers, and app2 has 1 containers (no container preempted)
|
|
|
|
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
|
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
- // Do allocation again, one container will be preempted
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
|
-
|
|
|
|
- // App1 has 6 containers, and app2 has 2 containers (new container allocated)
|
|
|
|
- Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
|
|
|
|
- Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
- rm1.close();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Test (timeout = 60000)
|
|
|
|
- public void testPreemptionConsidersHardNodeLocality()
|
|
|
|
- throws Exception {
|
|
|
|
- /**
|
|
|
|
- * Test case: same as testSimplePreemption steps 1-3.
|
|
|
|
- *
|
|
|
|
- * Step 4: app2 asks for 1G container with hard locality specified, and
|
|
|
|
- * asked host is not existed
|
|
|
|
- * Confirm system doesn't preempt any container.
|
|
|
|
- */
|
|
|
|
- MockRM rm1 = new MockRM(conf);
|
|
|
|
- rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
|
- rm1.start();
|
|
|
|
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
|
|
|
|
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
|
|
|
|
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
|
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
|
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
|
-
|
|
|
|
- // launch an app to queue, AM container should be launched in nm1
|
|
|
|
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
|
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
|
-
|
|
|
|
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
|
|
|
|
-
|
|
|
|
- // Do allocation 3 times for node1/node2
|
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
|
- }
|
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // App1 should have 7 containers now, and no available resource for cluster
|
|
|
|
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
|
- am1.getApplicationAttemptId());
|
|
|
|
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
- // Submit app2 to queue-c and asks for a 1G container for AM
|
|
|
|
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
|
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
|
|
|
-
|
|
|
|
- // NM1/NM2 has available resource = 0G
|
|
|
|
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
|
|
|
|
- .getUnallocatedResource().getMemory());
|
|
|
|
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
|
|
|
|
- .getUnallocatedResource().getMemory());
|
|
|
|
-
|
|
|
|
- // AM asks for a 1 * GB container for h3 with hard locality,
|
|
|
|
- // h3 doesn't exist in the cluster
|
|
|
|
- am2.allocate(Arrays.asList(ResourceRequest
|
|
|
|
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
|
|
|
|
- Resources.createResource(1 * GB), 1, true), ResourceRequest
|
|
|
|
- .newInstance(Priority.newInstance(1), "h3",
|
|
|
|
- Resources.createResource(1 * GB), 1, false), ResourceRequest
|
|
|
|
- .newInstance(Priority.newInstance(1), "/default-rack",
|
|
|
|
- Resources.createResource(1 * GB), 1, false)), null);
|
|
|
|
-
|
|
|
|
- // Get edit policy and do one update
|
|
|
|
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
|
|
|
|
-
|
|
|
|
- // Call edit schedule twice, and check if one container from app1 marked
|
|
|
|
- // to be "killable"
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
-
|
|
|
|
- PreemptionManager pm = cs.getPreemptionManager();
|
|
|
|
- Map<ContainerId, RMContainer> killableContainers =
|
|
|
|
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
|
|
|
|
- Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
|
|
|
|
- .getApplicationAttemptId(), am1.getApplicationAttemptId());
|
|
|
|
-
|
|
|
|
- // Call CS.handle once to see if container preempted
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
|
-
|
|
|
|
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
|
|
- am2.getApplicationAttemptId());
|
|
|
|
-
|
|
|
|
- // App1 has 7 containers, and app2 has 1 containers (no container preempted)
|
|
|
|
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
|
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
- // Do allocation again, nothing will be preempted
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
|
-
|
|
|
|
- // App1 has 7 containers, and app2 has 1 containers (no container allocated)
|
|
|
|
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
|
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
- rm1.close();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Test (timeout = 60000)
|
|
|
|
- public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
|
|
|
|
- throws Exception {
|
|
|
|
- /**
|
|
|
|
- * Test case:
|
|
|
|
- * <pre>
|
|
|
|
- * Root
|
|
|
|
- * / | \
|
|
|
|
- * a b c
|
|
|
|
- * 10 20 70
|
|
|
|
- * </pre>
|
|
|
|
- * Submit applications to two queues, one uses more than the other, so
|
|
|
|
- * preemption will happen.
|
|
|
|
- *
|
|
|
|
- * Check:
|
|
|
|
- * 1) Killable containers resources will be excluded from PCPP (no duplicated
|
|
|
|
- * container added to killable list)
|
|
|
|
- * 2) When more resources need to be preempted, new containers will be selected
|
|
|
|
- * and killable containers will be considered
|
|
|
|
- */
|
|
|
|
- MockRM rm1 = new MockRM(conf);
|
|
|
|
- rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
|
- rm1.start();
|
|
|
|
- MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
|
|
|
|
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
|
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
|
-
|
|
|
|
- // launch an app to queue, AM container should be launched in nm1
|
|
|
|
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
|
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
|
-
|
|
|
|
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
|
|
|
|
-
|
|
|
|
- // Do allocation 6 times for node1
|
|
|
|
- for (int i = 0; i < 6; i++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // App1 should have 7 containers now, and no available resource for cluster
|
|
|
|
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
|
- am1.getApplicationAttemptId());
|
|
|
|
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
- // Submit app2 to queue-c and asks for a 1G container for AM
|
|
|
|
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
|
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
|
|
|
-
|
|
|
|
- // NM1 has available resource = 0G
|
|
|
|
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
|
|
|
|
- .getUnallocatedResource().getMemory());
|
|
|
|
- am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
|
|
|
|
-
|
|
|
|
- // Get edit policy and do one update
|
|
|
|
- ProportionalCapacityPreemptionPolicy editPolicy =
|
|
|
|
- (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
|
|
|
|
-
|
|
|
|
- // Call edit schedule twice, and check if one container from app1 marked
|
|
|
|
- // to be "killable"
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
-
|
|
|
|
- PreemptionManager pm = cs.getPreemptionManager();
|
|
|
|
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
|
|
|
|
-
|
|
|
|
- // Check killable containers and to-be-preempted containers in edit policy
|
|
|
|
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
|
|
|
|
-
|
|
|
|
- // Run edit schedule again, confirm status doesn't changed
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
|
|
|
|
-
|
|
|
|
- // Save current to kill containers
|
|
|
|
- Set<ContainerId> previousKillableContainers = new HashSet<>(
|
|
|
|
- pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
|
|
|
|
- .keySet());
|
|
|
|
-
|
|
|
|
- // Update request resource of c from 1 to 2, so we need to preempt
|
|
|
|
- // one more container
|
|
|
|
- am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
|
|
|
|
-
|
|
|
|
- // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
|
|
|
|
- // and 1 container in killable map
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
- Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
|
|
|
|
-
|
|
|
|
- // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
|
|
|
|
-
|
|
|
|
- // Check if previous killable containers included by new killable containers
|
|
|
|
- Map<ContainerId, RMContainer> killableContainers =
|
|
|
|
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
|
|
|
|
- Assert.assertTrue(
|
|
|
|
- Sets.difference(previousKillableContainers, killableContainers.keySet())
|
|
|
|
- .isEmpty());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Test (timeout = 60000)
|
|
|
|
- public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
|
|
|
|
- throws Exception {
|
|
|
|
- /**
|
|
|
|
- * Test case:
|
|
|
|
- * <pre>
|
|
|
|
- * Root
|
|
|
|
- * / | \
|
|
|
|
- * a b c
|
|
|
|
- * 10 20 70
|
|
|
|
- * </pre>
|
|
|
|
- * Submit applications to two queues, one uses more than the other, so
|
|
|
|
- * preemption will happen.
|
|
|
|
- *
|
|
|
|
- * Check:
|
|
|
|
- * 1) Containers will be marked to killable
|
|
|
|
- * 2) Cancel resource request
|
|
|
|
- * 3) Killable containers will be cancelled from policy and scheduler
|
|
|
|
- */
|
|
|
|
- MockRM rm1 = new MockRM(conf);
|
|
|
|
- rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
|
- rm1.start();
|
|
|
|
- MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
|
|
|
|
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
|
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
|
-
|
|
|
|
- // launch an app to queue, AM container should be launched in nm1
|
|
|
|
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
|
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
|
-
|
|
|
|
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
|
|
|
|
-
|
|
|
|
- // Do allocation 6 times for node1
|
|
|
|
- for (int i = 0; i < 6; i++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // App1 should have 7 containers now, and no available resource for cluster
|
|
|
|
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
|
- am1.getApplicationAttemptId());
|
|
|
|
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
- // Submit app2 to queue-c and asks for a 1G container for AM
|
|
|
|
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
|
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
|
|
|
-
|
|
|
|
- // NM1 has available resource = 0G
|
|
|
|
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
|
|
|
|
- .getUnallocatedResource().getMemory());
|
|
|
|
- am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
|
|
|
|
-
|
|
|
|
- // Get edit policy and do one update
|
|
|
|
- ProportionalCapacityPreemptionPolicy editPolicy =
|
|
|
|
- (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
|
|
|
|
-
|
|
|
|
- // Call edit schedule twice, and check if 3 container from app1 marked
|
|
|
|
- // to be "killable"
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
-
|
|
|
|
- PreemptionManager pm = cs.getPreemptionManager();
|
|
|
|
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
|
|
|
|
-
|
|
|
|
- // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
|
|
|
|
- am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
|
|
|
|
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
|
|
|
|
-
|
|
|
|
- // Call editSchedule once more to make sure still nothing happens
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
|
|
|
|
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Test (timeout = 60000)
|
|
|
|
- public void testPreemptionConsidersUserLimit()
|
|
|
|
- throws Exception {
|
|
|
|
- /**
|
|
|
|
- * Test case: Submit two application (app1/app2) to different queues, queue
|
|
|
|
- * structure:
|
|
|
|
- *
|
|
|
|
- * <pre>
|
|
|
|
- * Root
|
|
|
|
- * / | \
|
|
|
|
- * a b c
|
|
|
|
- * 10 20 70
|
|
|
|
- * </pre>
|
|
|
|
- *
|
|
|
|
- * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
|
|
|
|
- *
|
|
|
|
- * 1) Two nodes in the cluster, each of them has 4G.
|
|
|
|
- *
|
|
|
|
- * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
|
|
|
|
- * more resource available.
|
|
|
|
- *
|
|
|
|
- * 3) app2 submit to queue-c, ask for one 1G container (for AM)
|
|
|
|
- *
|
|
|
|
- * Now the cluster is fulfilled.
|
|
|
|
- *
|
|
|
|
- * 4) app2 asks for another 1G container, system will preempt one container
|
|
|
|
- * from app1, and app2 will receive the preempted container
|
|
|
|
- */
|
|
|
|
- CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
|
|
|
|
- csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
|
|
|
|
- MockRM rm1 = new MockRM(csConf);
|
|
|
|
- rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
|
- rm1.start();
|
|
|
|
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
|
|
|
|
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
|
|
|
|
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
|
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
|
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
|
-
|
|
|
|
- // launch an app to queue, AM container should be launched in nm1
|
|
|
|
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
|
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
|
-
|
|
|
|
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
|
|
|
|
-
|
|
|
|
- // Do allocation 3 times for node1/node2
|
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // App1 should have 7 containers now, and no available resource for cluster
|
|
|
|
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
|
|
|
|
- am1.getApplicationAttemptId());
|
|
|
|
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
- // Submit app2 to queue-c and asks for a 1G container for AM
|
|
|
|
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
|
|
|
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
|
|
|
-
|
|
|
|
- // NM1/NM2 has available resource = 0G
|
|
|
|
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
|
|
|
|
- .getUnallocatedResource().getMemory());
|
|
|
|
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
|
|
|
|
- .getUnallocatedResource().getMemory());
|
|
|
|
-
|
|
|
|
- // AM asks for a 1 * GB container
|
|
|
|
- am2.allocate(Arrays.asList(ResourceRequest
|
|
|
|
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
|
|
|
|
- Resources.createResource(1 * GB), 1)), null);
|
|
|
|
-
|
|
|
|
- // Get edit policy and do one update
|
|
|
|
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
|
|
|
|
-
|
|
|
|
- // Call edit schedule twice, and check if no container from app1 marked
|
|
|
|
- // to be "killable"
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
- editPolicy.editSchedule();
|
|
|
|
-
|
|
|
|
- // No preemption happens
|
|
|
|
- PreemptionManager pm = cs.getPreemptionManager();
|
|
|
|
- Map<ContainerId, RMContainer> killableContainers =
|
|
|
|
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
|
|
|
|
- Assert.assertEquals(0, killableContainers.size());
|
|
|
|
-
|
|
|
|
- // Call CS.handle once to see if container preempted
|
|
|
|
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
|
-
|
|
|
|
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
|
|
|
- am2.getApplicationAttemptId());
|
|
|
|
-
|
|
|
|
- // App1 has 7 containers, and app2 has 1 containers (nothing preempted)
|
|
|
|
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
|
|
|
|
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
|
|
|
|
-
|
|
|
|
- rm1.close();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private Map<ContainerId, RMContainer> waitKillableContainersSize(
|
|
|
|
- PreemptionManager pm, String queueName, String partition,
|
|
|
|
- int expectedSize) throws InterruptedException {
|
|
|
|
- Map<ContainerId, RMContainer> killableContainers =
|
|
|
|
- pm.getKillableContainersMap(queueName, partition);
|
|
|
|
-
|
|
|
|
- int wait = 0;
|
|
|
|
- // Wait for at most 5 sec (it should be super fast actually)
|
|
|
|
- while (expectedSize != killableContainers.size() && wait < 500) {
|
|
|
|
- killableContainers = pm.getKillableContainersMap(queueName, partition);
|
|
|
|
- Thread.sleep(10);
|
|
|
|
- wait++;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Assert.assertEquals(expectedSize, killableContainers.size());
|
|
|
|
- return killableContainers;
|
|
|
|
- }
|
|
|
|
-}
|
|
|