|
@@ -17,1467 +17,259 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|
|
|
|
|
-import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
-import org.apache.hadoop.yarn.api.records.Container;
|
|
|
-import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
-import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
|
-
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
|
|
|
- .TestUtils;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
|
|
-import org.apache.hadoop.yarn.util.ControlledClock;
|
|
|
-import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
-
|
|
|
import org.junit.After;
|
|
|
-import org.junit.Assert;
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
+import org.junit.runner.RunWith;
|
|
|
+import org.junit.runners.Parameterized;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.FileWriter;
|
|
|
import java.io.IOException;
|
|
|
import java.io.PrintWriter;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.HashSet;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Set;
|
|
|
-
|
|
|
-import static org.junit.Assert.assertEquals;
|
|
|
-import static org.junit.Assert.assertTrue;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
|
|
|
+/**
|
|
|
+ * Tests to verify fairshare and minshare preemption, using parameterization.
|
|
|
+ */
|
|
|
+@RunWith(Parameterized.class)
|
|
|
public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
|
|
|
- private final static String ALLOC_FILE = new File(TEST_DIR,
|
|
|
- TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath();
|
|
|
+ private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
|
|
|
|
|
|
- private ControlledClock clock;
|
|
|
+ // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
|
|
|
+ private static final int NODE_CAPACITY_MULTIPLE = 4;
|
|
|
|
|
|
- private static class StubbedFairScheduler extends FairScheduler {
|
|
|
- public long lastPreemptMemory = -1;
|
|
|
+ private final boolean fairsharePreemption;
|
|
|
|
|
|
- @Override
|
|
|
- protected void preemptResources(Resource toPreempt) {
|
|
|
- lastPreemptMemory = toPreempt.getMemorySize();
|
|
|
- }
|
|
|
+ // App that takes up the entire cluster
|
|
|
+ private FSAppAttempt greedyApp;
|
|
|
|
|
|
- public void resetLastPreemptResources() {
|
|
|
- lastPreemptMemory = -1;
|
|
|
- }
|
|
|
+ // Starving app that is expected to instigate preemption
|
|
|
+ private FSAppAttempt starvingApp;
|
|
|
+
|
|
|
+ @Parameterized.Parameters
|
|
|
+ public static Collection<Boolean[]> getParameters() {
|
|
|
+ return Arrays.asList(new Boolean[][] {
|
|
|
+ {true}, {false}});
|
|
|
}
|
|
|
|
|
|
- public Configuration createConfiguration() {
|
|
|
- Configuration conf = super.createConfiguration();
|
|
|
- conf.setClass(YarnConfiguration.RM_SCHEDULER, StubbedFairScheduler.class,
|
|
|
- ResourceScheduler.class);
|
|
|
- conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
|
|
|
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
- return conf;
|
|
|
+ public TestFairSchedulerPreemption(Boolean fairshare) throws IOException {
|
|
|
+ fairsharePreemption = fairshare;
|
|
|
+ writeAllocFile();
|
|
|
}
|
|
|
|
|
|
@Before
|
|
|
- public void setup() throws IOException {
|
|
|
- conf = createConfiguration();
|
|
|
- clock = new ControlledClock();
|
|
|
+ public void setup() {
|
|
|
+ createConfiguration();
|
|
|
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
|
|
|
+ ALLOC_FILE.getAbsolutePath());
|
|
|
+ conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
|
|
|
+ conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
|
|
|
+ conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0);
|
|
|
}
|
|
|
|
|
|
@After
|
|
|
public void teardown() {
|
|
|
- if (resourceManager != null) {
|
|
|
- resourceManager.stop();
|
|
|
- resourceManager = null;
|
|
|
- }
|
|
|
+ ALLOC_FILE.delete();
|
|
|
conf = null;
|
|
|
- }
|
|
|
-
|
|
|
- private void startResourceManagerWithStubbedFairScheduler(float utilizationThreshold) {
|
|
|
- conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD,
|
|
|
- utilizationThreshold);
|
|
|
- resourceManager = new MockRM(conf);
|
|
|
- resourceManager.start();
|
|
|
-
|
|
|
- assertTrue(
|
|
|
- resourceManager.getResourceScheduler() instanceof StubbedFairScheduler);
|
|
|
- scheduler = (FairScheduler)resourceManager.getResourceScheduler();
|
|
|
-
|
|
|
- scheduler.setClock(clock);
|
|
|
- scheduler.updateInterval = 60 * 1000;
|
|
|
- }
|
|
|
-
|
|
|
- // YARN-4648: The starting code for ResourceManager mock is originated from
|
|
|
- // TestFairScheduler. It should be keep as it was to guarantee no changing
|
|
|
- // behaviour of ResourceManager preemption.
|
|
|
- private void startResourceManagerWithRealFairScheduler() {
|
|
|
- scheduler = new FairScheduler();
|
|
|
- conf = new YarnConfiguration();
|
|
|
- conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
|
|
|
- ResourceScheduler.class);
|
|
|
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
|
|
|
- conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
|
|
|
- 1024);
|
|
|
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
|
|
|
- conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
|
|
|
- conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
|
|
|
- conf.setFloat(
|
|
|
- FairSchedulerConfiguration
|
|
|
- .RM_SCHEDULER_RESERVATION_THRESHOLD_INCREMENT_MULTIPLE,
|
|
|
- TEST_RESERVATION_THRESHOLD);
|
|
|
-
|
|
|
- resourceManager = new MockRM(conf);
|
|
|
-
|
|
|
- // TODO: This test should really be using MockRM. For now starting stuff
|
|
|
- // that is needed at a bare minimum.
|
|
|
- ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
|
|
|
- resourceManager.getRMContext().getStateStore().start();
|
|
|
-
|
|
|
- // to initialize the master key
|
|
|
- resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
|
|
|
-
|
|
|
- scheduler.setRMContext(resourceManager.getRMContext());
|
|
|
- }
|
|
|
-
|
|
|
- private void stopResourceManager() {
|
|
|
- if (scheduler != null) {
|
|
|
- scheduler.stop();
|
|
|
- scheduler = null;
|
|
|
- }
|
|
|
if (resourceManager != null) {
|
|
|
resourceManager.stop();
|
|
|
resourceManager = null;
|
|
|
}
|
|
|
- QueueMetrics.clearQueueMetrics();
|
|
|
- DefaultMetricsSystem.shutdown();
|
|
|
- }
|
|
|
-
|
|
|
- private void registerNodeAndSubmitApp(
|
|
|
- int memory, int vcores, int appContainers, int appMemory) {
|
|
|
- RMNode node1 = MockNodes.newNodeInfo(
|
|
|
- 1, Resources.createResource(memory, vcores), 1, "node1");
|
|
|
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeEvent1);
|
|
|
-
|
|
|
- assertEquals("Incorrect amount of resources in the cluster",
|
|
|
- memory, scheduler.rootMetrics.getAvailableMB());
|
|
|
- assertEquals("Incorrect amount of resources in the cluster",
|
|
|
- vcores, scheduler.rootMetrics.getAvailableVirtualCores());
|
|
|
-
|
|
|
- createSchedulingRequest(appMemory, "queueA", "user1", appContainers);
|
|
|
- scheduler.update();
|
|
|
- // Sufficient node check-ins to fully schedule containers
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeUpdate1);
|
|
|
- }
|
|
|
- assertEquals("app1's request is not met",
|
|
|
- memory - appContainers * appMemory,
|
|
|
- scheduler.rootMetrics.getAvailableMB());
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testPreemptionWithFreeResources() throws Exception {
|
|
|
+ private void writeAllocFile() throws IOException {
|
|
|
+ /*
|
|
|
+ * Queue hierarchy:
|
|
|
+ * root
|
|
|
+ * |--- preemptable
|
|
|
+ * |--- child-1
|
|
|
+ * |--- child-2
|
|
|
+ * |--- nonpreemptible
|
|
|
+ * |--- child-1
|
|
|
+ * |--- child-2
|
|
|
+ */
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
out.println("<?xml version=\"1.0\"?>");
|
|
|
out.println("<allocations>");
|
|
|
- out.println("<queue name=\"default\">");
|
|
|
- out.println("<maxResources>0mb,0vcores</maxResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueA\">");
|
|
|
- out.println("<weight>1</weight>");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueB\">");
|
|
|
- out.println("<weight>1</weight>");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
|
|
|
- out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
|
|
|
- out.println("</allocations>");
|
|
|
- out.close();
|
|
|
-
|
|
|
- startResourceManagerWithStubbedFairScheduler(0f);
|
|
|
- // Create node with 4GB memory and 4 vcores
|
|
|
- registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024);
|
|
|
-
|
|
|
- // Verify submitting another request triggers preemption
|
|
|
- createSchedulingRequest(1024, "queueB", "user1", 1, 1);
|
|
|
- scheduler.update();
|
|
|
- clock.tickSec(6);
|
|
|
-
|
|
|
- ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
|
|
|
- scheduler.preemptTasksIfNecessary();
|
|
|
- assertEquals("preemptResources() should have been called", 1024,
|
|
|
- ((StubbedFairScheduler) scheduler).lastPreemptMemory);
|
|
|
-
|
|
|
- resourceManager.stop();
|
|
|
-
|
|
|
- startResourceManagerWithStubbedFairScheduler(0.8f);
|
|
|
- // Create node with 4GB memory and 4 vcores
|
|
|
- registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
|
|
|
|
|
|
- // Verify submitting another request doesn't trigger preemption
|
|
|
- createSchedulingRequest(1024, "queueB", "user1", 1, 1);
|
|
|
- scheduler.update();
|
|
|
- clock.tickSec(6);
|
|
|
+ out.println("<queue name=\"preemptable\">");
|
|
|
+ writePreemptionParams(out);
|
|
|
|
|
|
- ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
|
|
|
- scheduler.preemptTasksIfNecessary();
|
|
|
- assertEquals("preemptResources() should not have been called", -1,
|
|
|
- ((StubbedFairScheduler) scheduler).lastPreemptMemory);
|
|
|
+ // Child-1
|
|
|
+ out.println("<queue name=\"child-1\">");
|
|
|
+ writeResourceParams(out);
|
|
|
+ out.println("</queue>");
|
|
|
|
|
|
- resourceManager.stop();
|
|
|
+ // Child-2
|
|
|
+ out.println("<queue name=\"child-2\">");
|
|
|
+ writeResourceParams(out);
|
|
|
+ out.println("</queue>");
|
|
|
|
|
|
- startResourceManagerWithStubbedFairScheduler(0.7f);
|
|
|
- // Create node with 4GB memory and 4 vcores
|
|
|
- registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
|
|
|
+ out.println("</queue>"); // end of preemptable queue
|
|
|
|
|
|
- // Verify submitting another request triggers preemption
|
|
|
- createSchedulingRequest(1024, "queueB", "user1", 1, 1);
|
|
|
- scheduler.update();
|
|
|
- clock.tickSec(6);
|
|
|
+ // Queue with preemption disallowed
|
|
|
+ out.println("<queue name=\"nonpreemptable\">");
|
|
|
+ out.println("<allowPreemptionFrom>false" +
|
|
|
+ "</allowPreemptionFrom>");
|
|
|
+ writePreemptionParams(out);
|
|
|
|
|
|
- ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
|
|
|
- scheduler.preemptTasksIfNecessary();
|
|
|
- assertEquals("preemptResources() should have been called", 1024,
|
|
|
- ((StubbedFairScheduler) scheduler).lastPreemptMemory);
|
|
|
- }
|
|
|
+ // Child-1
|
|
|
+ out.println("<queue name=\"child-1\">");
|
|
|
+ writeResourceParams(out);
|
|
|
+ out.println("</queue>");
|
|
|
|
|
|
- @Test (timeout = 5000)
|
|
|
- /**
|
|
|
- * Make sure containers are chosen to be preempted in the correct order.
|
|
|
- */
|
|
|
- public void testChoiceOfPreemptedContainers() throws Exception {
|
|
|
- startResourceManagerWithRealFairScheduler();
|
|
|
- conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
|
|
|
- conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
|
|
|
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
|
|
|
- conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
|
|
|
+ // Child-2
|
|
|
+ out.println("<queue name=\"child-2\">");
|
|
|
+ writeResourceParams(out);
|
|
|
+ out.println("</queue>");
|
|
|
|
|
|
- ControlledClock clock = new ControlledClock();
|
|
|
- scheduler.setClock(clock);
|
|
|
+ out.println("</queue>"); // end of nonpreemptable queue
|
|
|
|
|
|
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
- out.println("<?xml version=\"1.0\"?>");
|
|
|
- out.println("<allocations>");
|
|
|
- out.println("<queue name=\"queueA\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueB\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueC\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"default\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("</queue>");
|
|
|
out.println("</allocations>");
|
|
|
out.close();
|
|
|
|
|
|
- scheduler.init(conf);
|
|
|
- scheduler.start();
|
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
-
|
|
|
- // Create two nodes
|
|
|
- RMNode node1 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
|
|
|
- "127.0.0.1");
|
|
|
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeEvent1);
|
|
|
-
|
|
|
- RMNode node2 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2,
|
|
|
- "127.0.0.2");
|
|
|
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
|
|
- scheduler.handle(nodeEvent2);
|
|
|
-
|
|
|
- // Queue A and B each request two applications
|
|
|
- ApplicationAttemptId app1 =
|
|
|
- createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1);
|
|
|
- createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
|
|
|
- ApplicationAttemptId app2 =
|
|
|
- createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3);
|
|
|
- createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2);
|
|
|
-
|
|
|
- ApplicationAttemptId app3 =
|
|
|
- createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1);
|
|
|
- createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3);
|
|
|
- ApplicationAttemptId app4 =
|
|
|
- createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3);
|
|
|
- createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4);
|
|
|
-
|
|
|
- scheduler.update();
|
|
|
-
|
|
|
- scheduler.getQueueManager().getLeafQueue("queueA", true)
|
|
|
- .setPolicy(SchedulingPolicy.parse("fifo"));
|
|
|
- scheduler.getQueueManager().getLeafQueue("queueB", true)
|
|
|
- .setPolicy(SchedulingPolicy.parse("fair"));
|
|
|
+ assertTrue("Allocation file does not exist, not running the test",
|
|
|
+ ALLOC_FILE.exists());
|
|
|
+ }
|
|
|
|
|
|
- // Sufficient node check-ins to fully schedule containers
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
|
|
- for (int i = 0; i < 4; i++) {
|
|
|
- scheduler.handle(nodeUpdate1);
|
|
|
- scheduler.handle(nodeUpdate2);
|
|
|
+ private void writePreemptionParams(PrintWriter out) {
|
|
|
+ if (fairsharePreemption) {
|
|
|
+ out.println("<fairSharePreemptionThreshold>1" +
|
|
|
+ "</fairSharePreemptionThreshold>");
|
|
|
+ out.println("<fairSharePreemptionTimeout>0" +
|
|
|
+ "</fairSharePreemptionTimeout>");
|
|
|
+ } else {
|
|
|
+ out.println("<minSharePreemptionTimeout>0" +
|
|
|
+ "</minSharePreemptionTimeout>");
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
|
|
- assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
|
|
- assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
|
|
- assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
|
|
|
-
|
|
|
- // Now new requests arrive from queueC and default
|
|
|
- createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
|
|
|
- createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
|
|
|
- createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
|
|
|
- createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
|
|
|
- scheduler.update();
|
|
|
-
|
|
|
- // We should be able to claw back one container from queueA and queueB each.
|
|
|
- scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
|
- assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
|
|
- assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
|
|
-
|
|
|
- // First verify we are adding containers to preemption list for the app.
|
|
|
- // For queueA (fifo), app2 is selected.
|
|
|
- // For queueB (fair), app4 is selected.
|
|
|
- assertTrue("App2 should have container to be preempted",
|
|
|
- !Collections.disjoint(
|
|
|
- scheduler.getSchedulerApp(app2).getLiveContainers(),
|
|
|
- scheduler.getSchedulerApp(app2).getPreemptionContainers()));
|
|
|
- assertTrue("App4 should have container to be preempted",
|
|
|
- !Collections.disjoint(
|
|
|
- scheduler.getSchedulerApp(app2).getLiveContainers(),
|
|
|
- scheduler.getSchedulerApp(app2).getPreemptionContainers()));
|
|
|
-
|
|
|
- // Pretend 15 seconds have passed
|
|
|
- clock.tickSec(15);
|
|
|
-
|
|
|
- // Trigger a kill by insisting we want containers back
|
|
|
- scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
|
-
|
|
|
- // At this point the containers should have been killed (since we are not simulating AM)
|
|
|
- assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
|
|
- assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
|
|
|
- // Inside each app, containers are sorted according to their priorities.
|
|
|
- // Containers with priority 4 are preempted for app2 and app4.
|
|
|
- Set<RMContainer> set = new HashSet<RMContainer>();
|
|
|
- for (RMContainer container :
|
|
|
- scheduler.getSchedulerApp(app2).getLiveContainers()) {
|
|
|
- if (container.getAllocatedSchedulerKey().getPriority().getPriority() ==
|
|
|
- 4) {
|
|
|
- set.add(container);
|
|
|
- }
|
|
|
- }
|
|
|
- for (RMContainer container :
|
|
|
- scheduler.getSchedulerApp(app4).getLiveContainers()) {
|
|
|
- if (container.getAllocatedSchedulerKey().getPriority().getPriority() ==
|
|
|
- 4) {
|
|
|
- set.add(container);
|
|
|
- }
|
|
|
+ private void writeResourceParams(PrintWriter out) {
|
|
|
+ if (!fairsharePreemption) {
|
|
|
+ out.println("<minResources>4096mb,4vcores</minResources>");
|
|
|
}
|
|
|
- assertTrue("Containers with priority=4 in app2 and app4 should be " +
|
|
|
- "preempted.", set.isEmpty());
|
|
|
-
|
|
|
- // Trigger a kill by insisting we want containers back
|
|
|
- scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
|
-
|
|
|
- // Pretend 15 seconds have passed
|
|
|
- clock.tickSec(15);
|
|
|
-
|
|
|
- // We should be able to claw back another container from A and B each.
|
|
|
- // For queueA (fifo), continue preempting from app2.
|
|
|
- // For queueB (fair), even app4 has a lowest priority container with p=4, it
|
|
|
- // still preempts from app3 as app3 is most over fair share.
|
|
|
- scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
|
-
|
|
|
- assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
|
|
- assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
|
|
- assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
|
|
- assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
|
|
|
-
|
|
|
- // Now A and B are below fair share, so preemption shouldn't do anything
|
|
|
- scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
|
- assertTrue("App1 should have no container to be preempted",
|
|
|
- scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty());
|
|
|
- assertTrue("App2 should have no container to be preempted",
|
|
|
- scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty());
|
|
|
- assertTrue("App3 should have no container to be preempted",
|
|
|
- scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty());
|
|
|
- assertTrue("App4 should have no container to be preempted",
|
|
|
- scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
|
|
|
- stopResourceManager();
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testPreemptionIsNotDelayedToNextRound() throws Exception {
|
|
|
- startResourceManagerWithRealFairScheduler();
|
|
|
-
|
|
|
- conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
|
|
|
- conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
|
|
|
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
- conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
|
|
|
-
|
|
|
- ControlledClock clock = new ControlledClock();
|
|
|
- scheduler.setClock(clock);
|
|
|
-
|
|
|
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
- out.println("<?xml version=\"1.0\"?>");
|
|
|
- out.println("<allocations>");
|
|
|
- out.println("<queue name=\"queueA\">");
|
|
|
- out.println("<weight>8</weight>");
|
|
|
- out.println("<queue name=\"queueA1\" />");
|
|
|
- out.println("<queue name=\"queueA2\" />");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueB\">");
|
|
|
- out.println("<weight>2</weight>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
|
|
|
- out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
|
|
|
- out.println("</allocations>");
|
|
|
- out.close();
|
|
|
-
|
|
|
- scheduler.init(conf);
|
|
|
- scheduler.start();
|
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
-
|
|
|
- // Add a node of 8G
|
|
|
- RMNode node1 = MockNodes.newNodeInfo(1,
|
|
|
- Resources.createResource(8 * 1024, 8), 1, "127.0.0.1");
|
|
|
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeEvent1);
|
|
|
-
|
|
|
- // Run apps in queueA.A1 and queueB
|
|
|
- ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1,
|
|
|
- "queueA.queueA1", "user1", 7, 1);
|
|
|
- // createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
|
|
|
- ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB",
|
|
|
- "user2", 1, 1);
|
|
|
+ private void setupCluster() throws IOException {
|
|
|
+ resourceManager = new MockRM(conf);
|
|
|
+ resourceManager.start();
|
|
|
+ scheduler = (FairScheduler) resourceManager.getResourceScheduler();
|
|
|
|
|
|
- scheduler.update();
|
|
|
+ // Create and add two nodes to the cluster
|
|
|
+ addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
|
|
|
+ addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
|
|
|
+ }
|
|
|
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
- for (int i = 0; i < 8; i++) {
|
|
|
- scheduler.handle(nodeUpdate1);
|
|
|
+ private void sendEnoughNodeUpdatesToAssignFully() {
|
|
|
+ for (RMNode node : rmNodes) {
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent =
|
|
|
+ new NodeUpdateSchedulerEvent(node);
|
|
|
+ for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) {
|
|
|
+ scheduler.handle(nodeUpdateSchedulerEvent);
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- // verify if the apps got the containers they requested
|
|
|
- assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
|
|
- assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
|
|
-
|
|
|
- // Now submit an app in queueA.queueA2
|
|
|
- ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1,
|
|
|
- "queueA.queueA2", "user3", 7, 1);
|
|
|
- scheduler.update();
|
|
|
-
|
|
|
- // Let 11 sec pass
|
|
|
- clock.tickSec(11);
|
|
|
-
|
|
|
- scheduler.update();
|
|
|
- Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager()
|
|
|
- .getLeafQueue("queueA.queueA2", false), clock.getTime());
|
|
|
- assertEquals(3277, toPreempt.getMemorySize());
|
|
|
-
|
|
|
- // verify if the 3 containers required by queueA2 are preempted in the same
|
|
|
- // round
|
|
|
- scheduler.preemptResources(toPreempt);
|
|
|
- assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
|
|
|
- .size());
|
|
|
- stopResourceManager();
|
|
|
}
|
|
|
|
|
|
- @Test (timeout = 5000)
|
|
|
/**
|
|
|
- * Tests the timing of decision to preempt tasks.
|
|
|
+ * Submit application to {@code queue1} and take over the entire cluster.
|
|
|
+ * Submit application with larger containers to {@code queue2} that
|
|
|
+ * requires preemption from the first application.
|
|
|
+ *
|
|
|
+ * @param queue1 first queue
|
|
|
+ * @param queue2 second queue
|
|
|
+ * @throws InterruptedException if interrupted while waiting
|
|
|
*/
|
|
|
- public void testPreemptionDecision() throws Exception {
|
|
|
- startResourceManagerWithRealFairScheduler();
|
|
|
-
|
|
|
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
- ControlledClock clock = new ControlledClock();
|
|
|
- scheduler.setClock(clock);
|
|
|
-
|
|
|
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
- out.println("<?xml version=\"1.0\"?>");
|
|
|
- out.println("<allocations>");
|
|
|
- out.println("<queue name=\"default\">");
|
|
|
- out.println("<maxResources>0mb,0vcores</maxResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueA\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueB\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueC\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueD\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
|
|
|
- out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
|
|
|
- out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
|
|
|
- out.println("</allocations>");
|
|
|
- out.close();
|
|
|
-
|
|
|
- scheduler.init(conf);
|
|
|
- scheduler.start();
|
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
-
|
|
|
- // Create four nodes
|
|
|
- RMNode node1 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
|
|
|
- "127.0.0.1");
|
|
|
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeEvent1);
|
|
|
-
|
|
|
- RMNode node2 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
|
|
|
- "127.0.0.2");
|
|
|
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
|
|
- scheduler.handle(nodeEvent2);
|
|
|
-
|
|
|
- RMNode node3 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
|
|
|
- "127.0.0.3");
|
|
|
- NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
|
|
|
- scheduler.handle(nodeEvent3);
|
|
|
-
|
|
|
- // Queue A and B each request three containers
|
|
|
- ApplicationAttemptId app1 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
|
|
|
- ApplicationAttemptId app2 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
|
|
|
- ApplicationAttemptId app3 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
|
|
|
-
|
|
|
- ApplicationAttemptId app4 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
|
|
|
- ApplicationAttemptId app5 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
|
|
|
- ApplicationAttemptId app6 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
|
|
|
-
|
|
|
- scheduler.update();
|
|
|
-
|
|
|
- // Sufficient node check-ins to fully schedule containers
|
|
|
- for (int i = 0; i < 2; i++) {
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeUpdate1);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
|
|
- scheduler.handle(nodeUpdate2);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
|
|
- scheduler.handle(nodeUpdate3);
|
|
|
- }
|
|
|
-
|
|
|
- // Now new requests arrive from queues C and D
|
|
|
- ApplicationAttemptId app7 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
|
|
|
- ApplicationAttemptId app8 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
|
|
|
- ApplicationAttemptId app9 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
|
|
|
-
|
|
|
- ApplicationAttemptId app10 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
|
|
|
- ApplicationAttemptId app11 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
|
|
|
- ApplicationAttemptId app12 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
|
|
|
-
|
|
|
+ private void submitApps(String queue1, String queue2)
|
|
|
+ throws InterruptedException {
|
|
|
+ // Create an app that takes up all the resources on the cluster
|
|
|
+ ApplicationAttemptId appAttemptId1
|
|
|
+ = createSchedulingRequest(1024, 1, queue1, "default",
|
|
|
+ NODE_CAPACITY_MULTIPLE * rmNodes.size());
|
|
|
+ greedyApp = scheduler.getSchedulerApp(appAttemptId1);
|
|
|
scheduler.update();
|
|
|
+ sendEnoughNodeUpdatesToAssignFully();
|
|
|
+ assertEquals(8, greedyApp.getLiveContainers().size());
|
|
|
|
|
|
- FSLeafQueue schedC =
|
|
|
- scheduler.getQueueManager().getLeafQueue("queueC", true);
|
|
|
- FSLeafQueue schedD =
|
|
|
- scheduler.getQueueManager().getLeafQueue("queueD", true);
|
|
|
+ // Create an app that takes up all the resources on the cluster
|
|
|
+ ApplicationAttemptId appAttemptId2
|
|
|
+ = createSchedulingRequest(2048, 2, queue2, "default",
|
|
|
+ NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
|
|
|
+ starvingApp = scheduler.getSchedulerApp(appAttemptId2);
|
|
|
|
|
|
- assertTrue(Resources.equals(
|
|
|
- Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
|
|
|
- assertTrue(Resources.equals(
|
|
|
- Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
|
|
|
- // After minSharePreemptionTime has passed, they should want to preempt min
|
|
|
- // share.
|
|
|
- clock.tickSec(6);
|
|
|
- assertEquals(
|
|
|
- 1024, scheduler.resourceDeficit(schedC, clock.getTime()).getMemorySize());
|
|
|
- assertEquals(
|
|
|
- 1024, scheduler.resourceDeficit(schedD, clock.getTime()).getMemorySize());
|
|
|
+ // Sleep long enough to pass
|
|
|
+ Thread.sleep(10);
|
|
|
|
|
|
- // After fairSharePreemptionTime has passed, they should want to preempt
|
|
|
- // fair share.
|
|
|
scheduler.update();
|
|
|
- clock.tickSec(6);
|
|
|
- assertEquals(
|
|
|
- 1536 , scheduler.resourceDeficit(schedC, clock.getTime()).getMemorySize());
|
|
|
- assertEquals(
|
|
|
- 1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemorySize());
|
|
|
- stopResourceManager();
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
-/**
|
|
|
- * Tests the timing of decision to preempt tasks.
|
|
|
- */
|
|
|
- public void testPreemptionDecisionWithDRF() throws Exception {
|
|
|
- startResourceManagerWithRealFairScheduler();
|
|
|
-
|
|
|
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
- ControlledClock clock = new ControlledClock();
|
|
|
- scheduler.setClock(clock);
|
|
|
-
|
|
|
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
- out.println("<?xml version=\"1.0\"?>");
|
|
|
- out.println("<allocations>");
|
|
|
- out.println("<queue name=\"default\">");
|
|
|
- out.println("<maxResources>0mb,0vcores</maxResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueA\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>1024mb,1vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueB\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>1024mb,2vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueC\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>1024mb,3vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueD\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>1024mb,2vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
|
|
|
- out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
|
|
|
- out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
|
|
|
- out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
|
|
|
- out.println("</allocations>");
|
|
|
- out.close();
|
|
|
-
|
|
|
- scheduler.init(conf);
|
|
|
- scheduler.start();
|
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
-
|
|
|
- // Create four nodes
|
|
|
- RMNode node1 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 1,
|
|
|
- "127.0.0.1");
|
|
|
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeEvent1);
|
|
|
-
|
|
|
- RMNode node2 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 2,
|
|
|
- "127.0.0.2");
|
|
|
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
|
|
- scheduler.handle(nodeEvent2);
|
|
|
-
|
|
|
- RMNode node3 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 3,
|
|
|
- "127.0.0.3");
|
|
|
- NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
|
|
|
- scheduler.handle(nodeEvent3);
|
|
|
-
|
|
|
- // Queue A and B each request three containers
|
|
|
- ApplicationAttemptId app1 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
|
|
|
- ApplicationAttemptId app2 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
|
|
|
- ApplicationAttemptId app3 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
|
|
|
-
|
|
|
- ApplicationAttemptId app4 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
|
|
|
- ApplicationAttemptId app5 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
|
|
|
- ApplicationAttemptId app6 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
|
|
|
-
|
|
|
- scheduler.update();
|
|
|
-
|
|
|
- // Sufficient node check-ins to fully schedule containers
|
|
|
- for (int i = 0; i < 2; i++) {
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeUpdate1);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
|
|
- scheduler.handle(nodeUpdate2);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
|
|
- scheduler.handle(nodeUpdate3);
|
|
|
+ private void verifyPreemption() throws InterruptedException {
|
|
|
+ // Sleep long enough for four containers to be preempted. Note that the
|
|
|
+ // starved app must be queued four times for containers to be preempted.
|
|
|
+ for (int i = 0; i < 10000; i++) {
|
|
|
+ if (greedyApp.getLiveContainers().size() == 4) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ Thread.sleep(10);
|
|
|
}
|
|
|
|
|
|
- // Now new requests arrive from queues C and D
|
|
|
- ApplicationAttemptId app7 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
|
|
|
- ApplicationAttemptId app8 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
|
|
|
- ApplicationAttemptId app9 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
|
|
|
-
|
|
|
- ApplicationAttemptId app10 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 1);
|
|
|
- ApplicationAttemptId app11 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 2);
|
|
|
- ApplicationAttemptId app12 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 3);
|
|
|
-
|
|
|
- scheduler.update();
|
|
|
-
|
|
|
- FSLeafQueue schedC =
|
|
|
- scheduler.getQueueManager().getLeafQueue("queueC", true);
|
|
|
- FSLeafQueue schedD =
|
|
|
- scheduler.getQueueManager().getLeafQueue("queueD", true);
|
|
|
+ // Verify the right amount of containers are preempted from greedyApp
|
|
|
+ assertEquals(4, greedyApp.getLiveContainers().size());
|
|
|
|
|
|
- assertTrue(Resources.equals(
|
|
|
- Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
|
|
|
- assertTrue(Resources.equals(
|
|
|
- Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
|
|
|
-
|
|
|
- // Test :
|
|
|
- // 1) whether componentWise min works as expected.
|
|
|
- // 2) DRF calculator is used
|
|
|
-
|
|
|
- // After minSharePreemptionTime has passed, they should want to preempt min
|
|
|
- // share.
|
|
|
- clock.tickSec(6);
|
|
|
- Resource res = scheduler.resourceDeficit(schedC, clock.getTime());
|
|
|
- assertEquals(1024, res.getMemorySize());
|
|
|
- // Demand = 3
|
|
|
- assertEquals(3, res.getVirtualCores());
|
|
|
-
|
|
|
- res = scheduler.resourceDeficit(schedD, clock.getTime());
|
|
|
- assertEquals(1024, res.getMemorySize());
|
|
|
- // Demand = 6, but min share = 2
|
|
|
- assertEquals(2, res.getVirtualCores());
|
|
|
-
|
|
|
- // After fairSharePreemptionTime has passed, they should want to preempt
|
|
|
- // fair share.
|
|
|
- scheduler.update();
|
|
|
- clock.tickSec(6);
|
|
|
- res = scheduler.resourceDeficit(schedC, clock.getTime());
|
|
|
- assertEquals(1536, res.getMemorySize());
|
|
|
- assertEquals(3, res.getVirtualCores());
|
|
|
+ sendEnoughNodeUpdatesToAssignFully();
|
|
|
|
|
|
- res = scheduler.resourceDeficit(schedD, clock.getTime());
|
|
|
- assertEquals(1536, res.getMemorySize());
|
|
|
- // Demand = 6, but fair share = 3
|
|
|
- assertEquals(3, res.getVirtualCores());
|
|
|
- stopResourceManager();
|
|
|
+ // Verify the preempted containers are assigned to starvingApp
|
|
|
+ assertEquals(2, starvingApp.getLiveContainers().size());
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- /**
|
|
|
- * Tests the various timing of decision to preempt tasks.
|
|
|
- */
|
|
|
- public void testPreemptionDecisionWithVariousTimeout() throws Exception {
|
|
|
- startResourceManagerWithRealFairScheduler();
|
|
|
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
- ControlledClock clock = new ControlledClock();
|
|
|
- scheduler.setClock(clock);
|
|
|
-
|
|
|
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
- out.println("<?xml version=\"1.0\"?>");
|
|
|
- out.println("<allocations>");
|
|
|
- out.println("<queue name=\"default\">");
|
|
|
- out.println("<maxResources>0mb,0vcores</maxResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueA\">");
|
|
|
- out.println("<weight>1</weight>");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueB\">");
|
|
|
- out.println("<weight>2</weight>");
|
|
|
- out.println("<minSharePreemptionTimeout>10</minSharePreemptionTimeout>");
|
|
|
- out.println("<fairSharePreemptionTimeout>25</fairSharePreemptionTimeout>");
|
|
|
- out.println("<queue name=\"queueB1\">");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueB2\">");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("<fairSharePreemptionTimeout>20</fairSharePreemptionTimeout>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueC\">");
|
|
|
- out.println("<weight>1</weight>");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
|
|
|
- out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>");
|
|
|
- out.println("</allocations>");
|
|
|
- out.close();
|
|
|
-
|
|
|
- scheduler.init(conf);
|
|
|
- scheduler.start();
|
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
-
|
|
|
- // Check the min/fair share preemption timeout for each queue
|
|
|
- QueueManager queueMgr = scheduler.getQueueManager();
|
|
|
- assertEquals(30000, queueMgr.getQueue("root")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- assertEquals(30000, queueMgr.getQueue("default")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- assertEquals(30000, queueMgr.getQueue("queueA")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- assertEquals(25000, queueMgr.getQueue("queueB")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- assertEquals(25000, queueMgr.getQueue("queueB.queueB1")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- assertEquals(20000, queueMgr.getQueue("queueB.queueB2")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- assertEquals(30000, queueMgr.getQueue("queueC")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- assertEquals(15000, queueMgr.getQueue("root")
|
|
|
- .getMinSharePreemptionTimeout());
|
|
|
- assertEquals(15000, queueMgr.getQueue("default")
|
|
|
- .getMinSharePreemptionTimeout());
|
|
|
- assertEquals(15000, queueMgr.getQueue("queueA")
|
|
|
- .getMinSharePreemptionTimeout());
|
|
|
- assertEquals(10000, queueMgr.getQueue("queueB")
|
|
|
- .getMinSharePreemptionTimeout());
|
|
|
- assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
|
|
|
- .getMinSharePreemptionTimeout());
|
|
|
- assertEquals(10000, queueMgr.getQueue("queueB.queueB2")
|
|
|
- .getMinSharePreemptionTimeout());
|
|
|
- assertEquals(15000, queueMgr.getQueue("queueC")
|
|
|
- .getMinSharePreemptionTimeout());
|
|
|
-
|
|
|
- // Create one big node
|
|
|
- RMNode node1 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(6 * 1024, 6), 1,
|
|
|
- "127.0.0.1");
|
|
|
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeEvent1);
|
|
|
-
|
|
|
- // Queue A takes all resources
|
|
|
- for (int i = 0; i < 6; i ++) {
|
|
|
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
|
|
|
- }
|
|
|
-
|
|
|
- scheduler.update();
|
|
|
-
|
|
|
- // Sufficient node check-ins to fully schedule containers
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
- for (int i = 0; i < 6; i++) {
|
|
|
- scheduler.handle(nodeUpdate1);
|
|
|
+ private void verifyNoPreemption() throws InterruptedException {
|
|
|
+ // Sleep long enough to ensure not even one container is preempted.
|
|
|
+ for (int i = 0; i < 600; i++) {
|
|
|
+ if (greedyApp.getLiveContainers().size() != 8) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ Thread.sleep(10);
|
|
|
}
|
|
|
-
|
|
|
- // Now new requests arrive from queues B1, B2 and C
|
|
|
- createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 1);
|
|
|
- createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 2);
|
|
|
- createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 3);
|
|
|
- createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 1);
|
|
|
- createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 2);
|
|
|
- createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 3);
|
|
|
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
|
|
|
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
|
|
|
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
|
|
|
-
|
|
|
- scheduler.update();
|
|
|
-
|
|
|
- FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", true);
|
|
|
- FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", true);
|
|
|
- FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true);
|
|
|
-
|
|
|
- assertTrue(Resources.equals(
|
|
|
- Resources.none(), scheduler.resourceDeficit(queueB1, clock.getTime())));
|
|
|
- assertTrue(Resources.equals(
|
|
|
- Resources.none(), scheduler.resourceDeficit(queueB2, clock.getTime())));
|
|
|
- assertTrue(Resources.equals(
|
|
|
- Resources.none(), scheduler.resourceDeficit(queueC, clock.getTime())));
|
|
|
-
|
|
|
- // After 5 seconds, queueB1 wants to preempt min share
|
|
|
- scheduler.update();
|
|
|
- clock.tickSec(6);
|
|
|
- assertEquals(
|
|
|
- 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
|
|
|
- assertEquals(
|
|
|
- 0, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
|
|
|
- assertEquals(
|
|
|
- 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
|
|
|
-
|
|
|
- // After 10 seconds, queueB2 wants to preempt min share
|
|
|
- scheduler.update();
|
|
|
- clock.tickSec(5);
|
|
|
- assertEquals(
|
|
|
- 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
|
|
|
- assertEquals(
|
|
|
- 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
|
|
|
- assertEquals(
|
|
|
- 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
|
|
|
-
|
|
|
- // After 15 seconds, queueC wants to preempt min share
|
|
|
- scheduler.update();
|
|
|
- clock.tickSec(5);
|
|
|
- assertEquals(
|
|
|
- 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
|
|
|
- assertEquals(
|
|
|
- 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
|
|
|
- assertEquals(
|
|
|
- 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
|
|
|
-
|
|
|
- // After 20 seconds, queueB2 should want to preempt fair share
|
|
|
- scheduler.update();
|
|
|
- clock.tickSec(5);
|
|
|
- assertEquals(
|
|
|
- 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
|
|
|
- assertEquals(
|
|
|
- 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
|
|
|
- assertEquals(
|
|
|
- 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
|
|
|
-
|
|
|
- // After 25 seconds, queueB1 should want to preempt fair share
|
|
|
- scheduler.update();
|
|
|
- clock.tickSec(5);
|
|
|
- assertEquals(
|
|
|
- 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
|
|
|
- assertEquals(
|
|
|
- 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
|
|
|
- assertEquals(
|
|
|
- 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
|
|
|
-
|
|
|
- // After 30 seconds, queueC should want to preempt fair share
|
|
|
- scheduler.update();
|
|
|
- clock.tickSec(5);
|
|
|
- assertEquals(
|
|
|
- 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemorySize());
|
|
|
- assertEquals(
|
|
|
- 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemorySize());
|
|
|
- assertEquals(
|
|
|
- 1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemorySize());
|
|
|
- stopResourceManager();
|
|
|
+ assertEquals(8, greedyApp.getLiveContainers().size());
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- /**
|
|
|
- * Tests the decision to preempt tasks respect to non-preemptable queues
|
|
|
- * 1, Queues as follow:
|
|
|
- * queueA(non-preemptable)
|
|
|
- * queueB(preemptable)
|
|
|
- * parentQueue(non-preemptable)
|
|
|
- * --queueC(preemptable)
|
|
|
- * queueD(preemptable)
|
|
|
- * 2, Submit request to queueA, queueB, queueC, and all of them are over MinShare
|
|
|
- * 3, Now all resource are occupied
|
|
|
- * 4, Submit request to queueD, and need to preempt resource from other queues
|
|
|
- * 5, Only preemptable queue(queueB) would be preempted.
|
|
|
- */
|
|
|
- public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception {
|
|
|
- startResourceManagerWithRealFairScheduler();
|
|
|
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
- ControlledClock clock = new ControlledClock();
|
|
|
- scheduler.setClock(clock);
|
|
|
-
|
|
|
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
- out.println("<?xml version=\"1.0\"?>");
|
|
|
- out.println("<allocations>");
|
|
|
- out.println("<queue name=\"default\">");
|
|
|
- out.println("<maxResources>0mb,0vcores</maxResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueA\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueB\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"parentQueue\">");
|
|
|
- out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
|
|
- out.println("<queue name=\"queueC\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueD\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>2048mb,0vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
|
|
|
- out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
|
|
|
- out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
|
|
|
- out.println("</allocations>");
|
|
|
- out.close();
|
|
|
-
|
|
|
- scheduler.init(conf);
|
|
|
- scheduler.start();
|
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
-
|
|
|
- // Create four nodes(3G each)
|
|
|
- RMNode node1 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
|
|
|
- "127.0.0.1");
|
|
|
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeEvent1);
|
|
|
-
|
|
|
- RMNode node2 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
|
|
|
- "127.0.0.2");
|
|
|
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
|
|
- scheduler.handle(nodeEvent2);
|
|
|
-
|
|
|
- RMNode node3 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
|
|
|
- "127.0.0.3");
|
|
|
- NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
|
|
|
- scheduler.handle(nodeEvent3);
|
|
|
-
|
|
|
- RMNode node4 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
|
|
|
- "127.0.0.4");
|
|
|
- NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
|
|
|
- scheduler.handle(nodeEvent4);
|
|
|
-
|
|
|
- // Submit apps to queueA, queueB, queueC,
|
|
|
- // now all resource of the cluster is occupied
|
|
|
- ApplicationAttemptId app1 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
|
|
|
- ApplicationAttemptId app2 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2);
|
|
|
- ApplicationAttemptId app3 =
|
|
|
- createSchedulingRequest(1 * 1024, "parentQueue.queueC", "user1", 4, 3);
|
|
|
-
|
|
|
- scheduler.update();
|
|
|
-
|
|
|
- // Sufficient node check-ins to fully schedule containers
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeUpdate1);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
|
|
- scheduler.handle(nodeUpdate2);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
|
|
- scheduler.handle(nodeUpdate3);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
|
|
|
- scheduler.handle(nodeUpdate4);
|
|
|
- }
|
|
|
-
|
|
|
- assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
|
|
- assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
|
|
- assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
|
|
-
|
|
|
- // Now new requests arrive from queues D
|
|
|
- ApplicationAttemptId app4 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueD", "user1", 4, 1);
|
|
|
- scheduler.update();
|
|
|
- FSLeafQueue schedD =
|
|
|
- scheduler.getQueueManager().getLeafQueue("queueD", true);
|
|
|
-
|
|
|
- // After minSharePreemptionTime has passed, 2G resource should preempted from
|
|
|
- // queueB to queueD
|
|
|
- clock.tickSec(6);
|
|
|
- assertEquals(2048,
|
|
|
- scheduler.resourceDeficit(schedD, clock.getTime()).getMemorySize());
|
|
|
-
|
|
|
- scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
|
- // now only app2 is selected to be preempted
|
|
|
- assertTrue("App2 should have container to be preempted",
|
|
|
- !Collections.disjoint(
|
|
|
- scheduler.getSchedulerApp(app2).getLiveContainers(),
|
|
|
- scheduler.getSchedulerApp(app2).getPreemptionContainers()));
|
|
|
- assertTrue("App1 should not have container to be preempted",
|
|
|
- Collections.disjoint(
|
|
|
- scheduler.getSchedulerApp(app1).getLiveContainers(),
|
|
|
- scheduler.getSchedulerApp(app1).getPreemptionContainers()));
|
|
|
- assertTrue("App3 should not have container to be preempted",
|
|
|
- Collections.disjoint(
|
|
|
- scheduler.getSchedulerApp(app3).getLiveContainers(),
|
|
|
- scheduler.getSchedulerApp(app3).getPreemptionContainers()));
|
|
|
- // Pretend 20 seconds have passed
|
|
|
- clock.tickSec(20);
|
|
|
- scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeUpdate1);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
|
|
- scheduler.handle(nodeUpdate2);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
|
|
- scheduler.handle(nodeUpdate3);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
|
|
|
- scheduler.handle(nodeUpdate4);
|
|
|
+ public void testPreemptionWithinSameLeafQueue() throws Exception {
|
|
|
+ setupCluster();
|
|
|
+ String queue = "root.preemptable.child-1";
|
|
|
+ submitApps(queue, queue);
|
|
|
+ if (fairsharePreemption) {
|
|
|
+ verifyPreemption();
|
|
|
+ } else {
|
|
|
+ verifyNoPreemption();
|
|
|
}
|
|
|
- // after preemption
|
|
|
- assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
|
|
- assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
|
|
- assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
|
|
- assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
|
|
|
- stopResourceManager();
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- /**
|
|
|
- * Tests the decision to preempt tasks when allowPreemptionFrom is set false on
|
|
|
- * all queues.
|
|
|
- * Then none of them would be preempted actually.
|
|
|
- * 1, Queues as follow:
|
|
|
- * queueA(non-preemptable)
|
|
|
- * queueB(non-preemptable)
|
|
|
- * parentQueue(non-preemptable)
|
|
|
- * --queueC(preemptable)
|
|
|
- * parentQueue(preemptable)
|
|
|
- * --queueD(non-preemptable)
|
|
|
- * 2, Submit request to queueB, queueC, queueD, and all of them are over MinShare
|
|
|
- * 3, Now all resource are occupied
|
|
|
- * 4, Submit request to queueA, and need to preempt resource from other queues
|
|
|
- * 5, None of queues would be preempted.
|
|
|
- */
|
|
|
- public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues()
|
|
|
- throws Exception {
|
|
|
- startResourceManagerWithRealFairScheduler();
|
|
|
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
- ControlledClock clock = new ControlledClock();
|
|
|
- scheduler.setClock(clock);
|
|
|
-
|
|
|
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
- out.println("<?xml version=\"1.0\"?>");
|
|
|
- out.println("<allocations>");
|
|
|
- out.println("<queue name=\"default\">");
|
|
|
- out.println("<maxResources>0mb,0vcores</maxResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueA\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>2048mb,0vcores</minResources>");
|
|
|
- out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueB\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"parentQueue1\">");
|
|
|
- out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
|
|
- out.println("<queue name=\"queueC\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"parentQueue2\">");
|
|
|
- out.println("<queue name=\"queueD\">");
|
|
|
- out.println("<weight>.25</weight>");
|
|
|
- out.println("<minResources>1024mb,0vcores</minResources>");
|
|
|
- out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
|
|
|
- out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
|
|
|
- out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
|
|
|
- out.println("</allocations>");
|
|
|
- out.close();
|
|
|
-
|
|
|
- scheduler.init(conf);
|
|
|
- scheduler.start();
|
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
-
|
|
|
- // Create four nodes(3G each)
|
|
|
- RMNode node1 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
|
|
|
- "127.0.0.1");
|
|
|
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeEvent1);
|
|
|
-
|
|
|
- RMNode node2 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
|
|
|
- "127.0.0.2");
|
|
|
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
|
|
- scheduler.handle(nodeEvent2);
|
|
|
-
|
|
|
- RMNode node3 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
|
|
|
- "127.0.0.3");
|
|
|
- NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
|
|
|
- scheduler.handle(nodeEvent3);
|
|
|
-
|
|
|
- RMNode node4 =
|
|
|
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
|
|
|
- "127.0.0.4");
|
|
|
- NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
|
|
|
- scheduler.handle(nodeEvent4);
|
|
|
-
|
|
|
- // Submit apps to queueB, queueC, queueD
|
|
|
- // now all resource of the cluster is occupied
|
|
|
-
|
|
|
- ApplicationAttemptId app1 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 1);
|
|
|
- ApplicationAttemptId app2 =
|
|
|
- createSchedulingRequest(1 * 1024, "parentQueue1.queueC", "user1", 4, 2);
|
|
|
- ApplicationAttemptId app3 =
|
|
|
- createSchedulingRequest(1 * 1024, "parentQueue2.queueD", "user1", 4, 3);
|
|
|
- scheduler.update();
|
|
|
-
|
|
|
- // Sufficient node check-ins to fully schedule containers
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeUpdate1);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
|
|
- scheduler.handle(nodeUpdate2);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
|
|
- scheduler.handle(nodeUpdate3);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
|
|
|
- scheduler.handle(nodeUpdate4);
|
|
|
- }
|
|
|
-
|
|
|
- assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
|
|
- assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
|
|
- assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
|
|
-
|
|
|
- // Now new requests arrive from queues A
|
|
|
- ApplicationAttemptId app4 =
|
|
|
- createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
|
|
|
- scheduler.update();
|
|
|
- FSLeafQueue schedA =
|
|
|
- scheduler.getQueueManager().getLeafQueue("queueA", true);
|
|
|
-
|
|
|
- // After minSharePreemptionTime has passed, resource deficit is 2G
|
|
|
- clock.tickSec(6);
|
|
|
- assertEquals(2048,
|
|
|
- scheduler.resourceDeficit(schedA, clock.getTime()).getMemorySize());
|
|
|
-
|
|
|
- scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
|
- // now none app is selected to be preempted
|
|
|
- assertTrue("App1 should have container to be preempted",
|
|
|
- Collections.disjoint(
|
|
|
- scheduler.getSchedulerApp(app1).getLiveContainers(),
|
|
|
- scheduler.getSchedulerApp(app1).getPreemptionContainers()));
|
|
|
- assertTrue("App2 should not have container to be preempted",
|
|
|
- Collections.disjoint(
|
|
|
- scheduler.getSchedulerApp(app2).getLiveContainers(),
|
|
|
- scheduler.getSchedulerApp(app2).getPreemptionContainers()));
|
|
|
- assertTrue("App3 should not have container to be preempted",
|
|
|
- Collections.disjoint(
|
|
|
- scheduler.getSchedulerApp(app3).getLiveContainers(),
|
|
|
- scheduler.getSchedulerApp(app3).getPreemptionContainers()));
|
|
|
- // Pretend 20 seconds have passed
|
|
|
- clock.tickSec(20);
|
|
|
- scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
- scheduler.handle(nodeUpdate1);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
|
|
- scheduler.handle(nodeUpdate2);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
|
|
- scheduler.handle(nodeUpdate3);
|
|
|
-
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
|
|
|
- scheduler.handle(nodeUpdate4);
|
|
|
- }
|
|
|
- // after preemption
|
|
|
- assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
|
|
- assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
|
|
- assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
|
|
- assertEquals(0, scheduler.getSchedulerApp(app4).getLiveContainers().size());
|
|
|
- stopResourceManager();
|
|
|
+ public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
|
|
|
+ setupCluster();
|
|
|
+ submitApps("root.preemptable.child-1", "root.preemptable.child-2");
|
|
|
+ verifyPreemption();
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
|
|
|
- startResourceManagerWithRealFairScheduler();
|
|
|
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
-
|
|
|
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
- out.println("<?xml version=\"1.0\"?>");
|
|
|
- out.println("<allocations>");
|
|
|
- out.println("<queue name=\"default\">");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueA\">");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueB\">");
|
|
|
- out.println("<queue name=\"queueB1\">");
|
|
|
- out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueB2\">");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueC\">");
|
|
|
- out.println("</queue>");
|
|
|
- out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
|
|
|
- out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>");
|
|
|
- out.print("<fairSharePreemptionTimeout>40</fairSharePreemptionTimeout>");
|
|
|
- out.println("</allocations>");
|
|
|
- out.close();
|
|
|
-
|
|
|
- scheduler.init(conf);
|
|
|
- scheduler.start();
|
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
-
|
|
|
- // Check the min/fair share preemption timeout for each queue
|
|
|
- QueueManager queueMgr = scheduler.getQueueManager();
|
|
|
- assertEquals(30000, queueMgr.getQueue("root")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- assertEquals(30000, queueMgr.getQueue("default")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- assertEquals(30000, queueMgr.getQueue("queueA")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- assertEquals(30000, queueMgr.getQueue("queueB")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- assertEquals(30000, queueMgr.getQueue("queueB.queueB1")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- assertEquals(30000, queueMgr.getQueue("queueB.queueB2")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- assertEquals(30000, queueMgr.getQueue("queueC")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- assertEquals(15000, queueMgr.getQueue("root")
|
|
|
- .getMinSharePreemptionTimeout());
|
|
|
- assertEquals(15000, queueMgr.getQueue("default")
|
|
|
- .getMinSharePreemptionTimeout());
|
|
|
- assertEquals(15000, queueMgr.getQueue("queueA")
|
|
|
- .getMinSharePreemptionTimeout());
|
|
|
- assertEquals(15000, queueMgr.getQueue("queueB")
|
|
|
- .getMinSharePreemptionTimeout());
|
|
|
- assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
|
|
|
- .getMinSharePreemptionTimeout());
|
|
|
- assertEquals(15000, queueMgr.getQueue("queueB.queueB2")
|
|
|
- .getMinSharePreemptionTimeout());
|
|
|
- assertEquals(15000, queueMgr.getQueue("queueC")
|
|
|
- .getMinSharePreemptionTimeout());
|
|
|
-
|
|
|
- // If both exist, we take the default one
|
|
|
- out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
|
- out.println("<?xml version=\"1.0\"?>");
|
|
|
- out.println("<allocations>");
|
|
|
- out.println("<queue name=\"default\">");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueA\">");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueB\">");
|
|
|
- out.println("<queue name=\"queueB1\">");
|
|
|
- out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueB2\">");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("<queue name=\"queueC\">");
|
|
|
- out.println("</queue>");
|
|
|
- out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
|
|
|
- out.print("<defaultFairSharePreemptionTimeout>25</defaultFairSharePreemptionTimeout>");
|
|
|
- out.print("<fairSharePreemptionTimeout>30</fairSharePreemptionTimeout>");
|
|
|
- out.println("</allocations>");
|
|
|
- out.close();
|
|
|
-
|
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
-
|
|
|
- assertEquals(25000, queueMgr.getQueue("root")
|
|
|
- .getFairSharePreemptionTimeout());
|
|
|
- stopResourceManager();
|
|
|
+ public void testPreemptionBetweenNonSiblingQueues() throws Exception {
|
|
|
+ setupCluster();
|
|
|
+ submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
|
|
|
+ verifyPreemption();
|
|
|
}
|
|
|
|
|
|
- @Test(timeout = 5000)
|
|
|
- public void testRecoverRequestAfterPreemption() throws Exception {
|
|
|
- startResourceManagerWithRealFairScheduler();
|
|
|
- conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
|
|
|
-
|
|
|
- ControlledClock clock = new ControlledClock();
|
|
|
- scheduler.setClock(clock);
|
|
|
- scheduler.init(conf);
|
|
|
- scheduler.start();
|
|
|
- scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
-
|
|
|
- SchedulerRequestKey schedulerKey = TestUtils.toSchedulerKey(20);
|
|
|
- String host = "127.0.0.1";
|
|
|
- int GB = 1024;
|
|
|
-
|
|
|
- // Create Node and raised Node Added event
|
|
|
- RMNode node = MockNodes.newNodeInfo(1,
|
|
|
- Resources.createResource(16 * 1024, 4), 0, host);
|
|
|
- NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
|
|
- scheduler.handle(nodeEvent);
|
|
|
-
|
|
|
- // Create 3 container requests and place it in ask
|
|
|
- List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
|
|
- ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host,
|
|
|
- schedulerKey.getPriority().getPriority(), 1, true);
|
|
|
- ResourceRequest rackLocalRequest = createResourceRequest(GB, 1,
|
|
|
- node.getRackName(), schedulerKey.getPriority().getPriority(), 1,
|
|
|
- true);
|
|
|
- ResourceRequest offRackRequest = createResourceRequest(GB, 1,
|
|
|
- ResourceRequest.ANY, schedulerKey.getPriority().getPriority(), 1, true);
|
|
|
- ask.add(nodeLocalRequest);
|
|
|
- ask.add(rackLocalRequest);
|
|
|
- ask.add(offRackRequest);
|
|
|
-
|
|
|
- // Create Request and update
|
|
|
- ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA",
|
|
|
- "user1", ask);
|
|
|
- scheduler.update();
|
|
|
-
|
|
|
- // Sufficient node check-ins to fully schedule containers
|
|
|
- NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
|
|
|
- scheduler.handle(nodeUpdate);
|
|
|
-
|
|
|
- assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
|
|
|
- .size());
|
|
|
- SchedulerApplicationAttempt app = scheduler.getSchedulerApp(appAttemptId);
|
|
|
-
|
|
|
- // ResourceRequest will be empty once NodeUpdate is completed
|
|
|
- Assert.assertNull(app.getResourceRequest(schedulerKey, host));
|
|
|
-
|
|
|
- ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
|
|
|
- RMContainer rmContainer = app.getRMContainer(containerId1);
|
|
|
-
|
|
|
- // Create a preempt event and register for preemption
|
|
|
- scheduler.warnOrKillContainer(rmContainer);
|
|
|
-
|
|
|
- // Wait for few clock ticks
|
|
|
- clock.tickSec(5);
|
|
|
-
|
|
|
- // preempt now
|
|
|
- scheduler.warnOrKillContainer(rmContainer);
|
|
|
-
|
|
|
- // Trigger container rescheduled event
|
|
|
- scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer,
|
|
|
- SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
|
|
|
-
|
|
|
- List<ResourceRequest> requests = rmContainer.getResourceRequests();
|
|
|
- // Once recovered, resource request will be present again in app
|
|
|
- Assert.assertEquals(3, requests.size());
|
|
|
- for (ResourceRequest request : requests) {
|
|
|
- Assert.assertEquals(1,
|
|
|
- app.getResourceRequest(schedulerKey, request.getResourceName())
|
|
|
- .getNumContainers());
|
|
|
- }
|
|
|
-
|
|
|
- // Send node heartbeat
|
|
|
- scheduler.update();
|
|
|
- scheduler.handle(nodeUpdate);
|
|
|
-
|
|
|
- List<Container> containers = scheduler.allocate(appAttemptId,
|
|
|
- Collections.<ResourceRequest> emptyList(),
|
|
|
- Collections.<ContainerId> emptyList(), null, null, null, null).getContainers();
|
|
|
-
|
|
|
- // Now with updated ResourceRequest, a container is allocated for AM.
|
|
|
- Assert.assertTrue(containers.size() == 1);
|
|
|
- stopResourceManager();
|
|
|
+ @Test
|
|
|
+ public void testNoPreemptionFromDisallowedQueue() throws Exception {
|
|
|
+ setupCluster();
|
|
|
+ submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1");
|
|
|
+ verifyNoPreemption();
|
|
|
}
|
|
|
}
|