|
@@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuot
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyWithExclusivePartitions;
|
|
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
|
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
|
@@ -132,6 +133,8 @@ public class TestLeafQueue {
|
|
final static int GB = 1024;
|
|
final static int GB = 1024;
|
|
final static String DEFAULT_RACK = "/default";
|
|
final static String DEFAULT_RACK = "/default";
|
|
|
|
|
|
|
|
+ private final static String LABEL = "test";
|
|
|
|
+
|
|
private final ResourceCalculator resourceCalculator =
|
|
private final ResourceCalculator resourceCalculator =
|
|
new DefaultResourceCalculator();
|
|
new DefaultResourceCalculator();
|
|
|
|
|
|
@@ -140,14 +143,19 @@ public class TestLeafQueue {
|
|
|
|
|
|
@Before
|
|
@Before
|
|
public void setUp() throws Exception {
|
|
public void setUp() throws Exception {
|
|
- setUpInternal(resourceCalculator);
|
|
|
|
|
|
+ setUpInternal(resourceCalculator, false);
|
|
}
|
|
}
|
|
|
|
|
|
private void setUpWithDominantResourceCalculator() throws Exception {
|
|
private void setUpWithDominantResourceCalculator() throws Exception {
|
|
- setUpInternal(dominantResourceCalculator);
|
|
|
|
|
|
+ setUpInternal(dominantResourceCalculator, false);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void setUpWithNodeLabels() throws Exception {
|
|
|
|
+ setUpInternal(resourceCalculator, true);
|
|
}
|
|
}
|
|
|
|
|
|
- private void setUpInternal(ResourceCalculator rC) throws Exception {
|
|
|
|
|
|
+ private void setUpInternal(ResourceCalculator rC, boolean withNodeLabels)
|
|
|
|
+ throws Exception {
|
|
CapacityScheduler spyCs = new CapacityScheduler();
|
|
CapacityScheduler spyCs = new CapacityScheduler();
|
|
queues = new HashMap<String, CSQueue>();
|
|
queues = new HashMap<String, CSQueue>();
|
|
cs = spy(spyCs);
|
|
cs = spy(spyCs);
|
|
@@ -172,7 +180,7 @@ public class TestLeafQueue {
|
|
csConf.setBoolean(CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES,
|
|
csConf.setBoolean(CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES,
|
|
false);
|
|
false);
|
|
final String newRoot = "root" + System.currentTimeMillis();
|
|
final String newRoot = "root" + System.currentTimeMillis();
|
|
- setupQueueConfiguration(csConf, newRoot);
|
|
|
|
|
|
+ setupQueueConfiguration(csConf, newRoot, withNodeLabels);
|
|
YarnConfiguration conf = new YarnConfiguration();
|
|
YarnConfiguration conf = new YarnConfiguration();
|
|
cs.setConf(conf);
|
|
cs.setConf(conf);
|
|
|
|
|
|
@@ -228,24 +236,39 @@ public class TestLeafQueue {
|
|
private static final String E = "e";
|
|
private static final String E = "e";
|
|
private void setupQueueConfiguration(
|
|
private void setupQueueConfiguration(
|
|
CapacitySchedulerConfiguration conf,
|
|
CapacitySchedulerConfiguration conf,
|
|
- final String newRoot) {
|
|
|
|
|
|
+ final String newRoot, boolean withNodeLabels) {
|
|
|
|
|
|
// Define top-level queues
|
|
// Define top-level queues
|
|
conf.setQueues(ROOT, new String[] {newRoot});
|
|
conf.setQueues(ROOT, new String[] {newRoot});
|
|
conf.setMaximumCapacity(ROOT, 100);
|
|
conf.setMaximumCapacity(ROOT, 100);
|
|
conf.setAcl(ROOT,
|
|
conf.setAcl(ROOT,
|
|
QueueACL.SUBMIT_APPLICATIONS, " ");
|
|
QueueACL.SUBMIT_APPLICATIONS, " ");
|
|
|
|
+ if (withNodeLabels) {
|
|
|
|
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, LABEL, 100);
|
|
|
|
+ conf.setMaximumCapacityByLabel(CapacitySchedulerConfiguration.ROOT,
|
|
|
|
+ LABEL, 100);
|
|
|
|
+ }
|
|
|
|
|
|
final String Q_newRoot = ROOT + "." + newRoot;
|
|
final String Q_newRoot = ROOT + "." + newRoot;
|
|
conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E});
|
|
conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E});
|
|
conf.setCapacity(Q_newRoot, 100);
|
|
conf.setCapacity(Q_newRoot, 100);
|
|
conf.setMaximumCapacity(Q_newRoot, 100);
|
|
conf.setMaximumCapacity(Q_newRoot, 100);
|
|
conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
|
|
conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
|
|
|
|
+ if (withNodeLabels) {
|
|
|
|
+ conf.setAccessibleNodeLabels(Q_newRoot, Collections.singleton(LABEL));
|
|
|
|
+ conf.setCapacityByLabel(Q_newRoot, LABEL, 100);
|
|
|
|
+ conf.setMaximumCapacityByLabel(Q_newRoot, LABEL, 100);
|
|
|
|
+ }
|
|
|
|
|
|
final String Q_A = Q_newRoot + "." + A;
|
|
final String Q_A = Q_newRoot + "." + A;
|
|
conf.setCapacity(Q_A, 8.5f);
|
|
conf.setCapacity(Q_A, 8.5f);
|
|
conf.setMaximumCapacity(Q_A, 20);
|
|
conf.setMaximumCapacity(Q_A, 20);
|
|
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
|
|
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
|
|
|
|
+ if (withNodeLabels) {
|
|
|
|
+ conf.setAccessibleNodeLabels(Q_A, Collections.singleton(LABEL));
|
|
|
|
+ conf.setCapacityByLabel(Q_A, LABEL, 100);
|
|
|
|
+ conf.setMaximumCapacityByLabel(Q_A, LABEL, 100);
|
|
|
|
+ }
|
|
|
|
|
|
final String Q_B = Q_newRoot + "." + B;
|
|
final String Q_B = Q_newRoot + "." + B;
|
|
conf.setCapacity(Q_B, 80);
|
|
conf.setCapacity(Q_B, 80);
|
|
@@ -3097,7 +3120,7 @@ public class TestLeafQueue {
|
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
|
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
|
|
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
|
|
final String newRootName = "root" + System.currentTimeMillis();
|
|
final String newRootName = "root" + System.currentTimeMillis();
|
|
- setupQueueConfiguration(csConf, newRootName);
|
|
|
|
|
|
+ setupQueueConfiguration(csConf, newRootName, false);
|
|
|
|
|
|
Resource clusterResource = Resources.createResource(100 * 16 * GB,
|
|
Resource clusterResource = Resources.createResource(100 * 16 * GB,
|
|
100 * 32);
|
|
100 * 32);
|
|
@@ -3289,6 +3312,116 @@ public class TestLeafQueue {
|
|
Assert.assertEquals(3 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
Assert.assertEquals(3 * GB, app_0.getCurrentConsumption().getMemorySize());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testFifoWithPartitionsAssignment() throws Exception {
|
|
|
|
+ setUpWithNodeLabels();
|
|
|
|
+
|
|
|
|
+ LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
|
|
|
|
+ OrderingPolicy<FiCaSchedulerApp> policy =
|
|
|
|
+ new FifoOrderingPolicyWithExclusivePartitions<>();
|
|
|
|
+ policy.configure(Collections.singletonMap(
|
|
|
|
+ YarnConfiguration.EXCLUSIVE_ENFORCED_PARTITIONS_SUFFIX, LABEL));
|
|
|
|
+ a.setOrderingPolicy(policy);
|
|
|
|
+ String host00 = "127.0.0.1";
|
|
|
|
+ String rack0 = "rack_0";
|
|
|
|
+ FiCaSchedulerNode node00 = TestUtils.getMockNode(host00, rack0, 0,
|
|
|
|
+ 16 * GB);
|
|
|
|
+ when(node00.getPartition()).thenReturn(LABEL);
|
|
|
|
+ String host01 = "127.0.0.2";
|
|
|
|
+ FiCaSchedulerNode node01 = TestUtils.getMockNode(host01, rack0, 0,
|
|
|
|
+ 16 * GB);
|
|
|
|
+ when(node01.getPartition()).thenReturn("");
|
|
|
|
+
|
|
|
|
+ final int numNodes = 4;
|
|
|
|
+ Resource clusterResource = Resources.createResource(numNodes * (16 * GB),
|
|
|
|
+ numNodes * 16);
|
|
|
|
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
|
|
|
+
|
|
|
|
+ String user0 = "user_0";
|
|
|
|
+
|
|
|
|
+ final ApplicationAttemptId appAttemptId0 =
|
|
|
|
+ TestUtils.getMockApplicationAttemptId(0, 0);
|
|
|
|
+ FiCaSchedulerApp app0 = spy(new FiCaSchedulerApp(appAttemptId0, user0, a,
|
|
|
|
+ mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5),
|
|
|
|
+ false));
|
|
|
|
+ a.submitApplicationAttempt(app0, user0);
|
|
|
|
+
|
|
|
|
+ final ApplicationAttemptId appAttemptId1 =
|
|
|
|
+ TestUtils.getMockApplicationAttemptId(1, 0);
|
|
|
|
+ FiCaSchedulerApp app1 = spy(new FiCaSchedulerApp(appAttemptId1, user0, a,
|
|
|
|
+ mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3),
|
|
|
|
+ false));
|
|
|
|
+ when(app1.getPartition()).thenReturn(LABEL);
|
|
|
|
+ a.submitApplicationAttempt(app1, user0);
|
|
|
|
+
|
|
|
|
+ Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
|
|
|
|
+ app0.getApplicationAttemptId(), app0, app1.getApplicationAttemptId(),
|
|
|
|
+ app1);
|
|
|
|
+ Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node00.getNodeID(),
|
|
|
|
+ node00, node01.getNodeID(), node01);
|
|
|
|
+
|
|
|
|
+ Priority priority = TestUtils.createMockPriority(1);
|
|
|
|
+ List<ResourceRequest> app0Requests = new ArrayList<>();
|
|
|
|
+ List<ResourceRequest> app1Requests = new ArrayList<>();
|
|
|
|
+
|
|
|
|
+ app0Requests.clear();
|
|
|
|
+ app0Requests.add(TestUtils
|
|
|
|
+ .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, priority,
|
|
|
|
+ recordFactory));
|
|
|
|
+ app0.updateResourceRequests(app0Requests);
|
|
|
|
+
|
|
|
|
+ app1Requests.clear();
|
|
|
|
+ app1Requests.add(TestUtils
|
|
|
|
+ .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
|
|
|
|
+ recordFactory, LABEL));
|
|
|
|
+ app1.updateResourceRequests(app1Requests);
|
|
|
|
+
|
|
|
|
+ // app_1 will get containers since it is exclusive-enforced
|
|
|
|
+ applyCSAssignment(clusterResource,
|
|
|
|
+ a.assignContainers(clusterResource, node00,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
|
|
|
|
+ Assert.assertEquals(1 * GB, app1.getSchedulingResourceUsage()
|
|
|
|
+ .getUsed(LABEL).getMemorySize());
|
|
|
|
+ // app_0 should not get resources from node_0_0 since the labels
|
|
|
|
+ // don't match
|
|
|
|
+ applyCSAssignment(clusterResource,
|
|
|
|
+ a.assignContainers(clusterResource, node00,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
|
|
|
|
+ Assert.assertEquals(0 * GB, app0.getCurrentConsumption().getMemorySize());
|
|
|
|
+
|
|
|
|
+ app1Requests.clear();
|
|
|
|
+ app1Requests.add(TestUtils
|
|
|
|
+ .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
|
|
|
|
+ recordFactory, LABEL));
|
|
|
|
+ app1.updateResourceRequests(app1Requests);
|
|
|
|
+
|
|
|
|
+ // When node_0_1 heartbeats, app_0 should get containers
|
|
|
|
+ applyCSAssignment(clusterResource,
|
|
|
|
+ a.assignContainers(clusterResource, node01,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
|
|
|
|
+ Assert.assertEquals(2 * GB, app0.getCurrentConsumption().getMemorySize());
|
|
|
|
+ Assert.assertEquals(1 * GB, app1.getSchedulingResourceUsage()
|
|
|
|
+ .getUsed(LABEL).getMemorySize());
|
|
|
|
+
|
|
|
|
+ app0Requests.clear();
|
|
|
|
+ app0Requests.add(TestUtils
|
|
|
|
+ .createResourceRequest(ResourceRequest.ANY, 1 * GB, 1, true, priority,
|
|
|
|
+ recordFactory));
|
|
|
|
+ app0.updateResourceRequests(app0Requests);
|
|
|
|
+
|
|
|
|
+ // When node_0_0 heartbeats, app_1 should get containers again
|
|
|
|
+ applyCSAssignment(clusterResource,
|
|
|
|
+ a.assignContainers(clusterResource, node00,
|
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
|
|
|
|
+ Assert.assertEquals(2 * GB, app0.getCurrentConsumption().getMemorySize());
|
|
|
|
+ Assert.assertEquals(2 * GB, app1.getSchedulingResourceUsage()
|
|
|
|
+ .getUsed(LABEL).getMemorySize());
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testConcurrentAccess() throws Exception {
|
|
public void testConcurrentAccess() throws Exception {
|
|
YarnConfiguration conf = new YarnConfiguration();
|
|
YarnConfiguration conf = new YarnConfiguration();
|