|
@@ -18,6 +18,8 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|
|
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
@@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
|
@@ -1676,4 +1679,213 @@ public class TestNodeLabelContainerAllocation {
|
|
|
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
|
|
|
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testQueueMetricsWithLabels() throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: have a following queue structure:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * (x) (x)
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * a/b can access x, both of them has max-capacity-on-x = 50
|
|
|
+ *
|
|
|
+ * When doing non-exclusive allocation, app in a (or b) can use 100% of x
|
|
|
+ * resource.
|
|
|
+ */
|
|
|
+
|
|
|
+ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
|
|
|
+ this.conf);
|
|
|
+
|
|
|
+ // Define top-level queues
|
|
|
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
|
|
+ new String[] { "a", "b" });
|
|
|
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
|
|
+
|
|
|
+ final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
|
|
|
+ csConf.setCapacity(queueA, 25);
|
|
|
+ csConf.setAccessibleNodeLabels(queueA, toSet("x"));
|
|
|
+ csConf.setCapacityByLabel(queueA, "x", 50);
|
|
|
+ csConf.setMaximumCapacityByLabel(queueA, "x", 50);
|
|
|
+ final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
|
|
|
+ csConf.setCapacity(queueB, 75);
|
|
|
+ csConf.setAccessibleNodeLabels(queueB, toSet("x"));
|
|
|
+ csConf.setCapacityByLabel(queueB, "x", 50);
|
|
|
+ csConf.setMaximumCapacityByLabel(queueB, "x", 50);
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(
|
|
|
+ ImmutableSet.of(NodeLabel.newInstance("x", false)));
|
|
|
+ mgr.addToCluserNodeLabels(
|
|
|
+ ImmutableSet.of(NodeLabel.newInstance("y", false)));
|
|
|
+ mgr.addLabelsToNode(
|
|
|
+ ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
|
|
|
+ mgr.addLabelsToNode(
|
|
|
+ ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(csConf) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y
|
|
|
+ // app1 -> a
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ // app1 asks for 5 partition=x containers
|
|
|
+ am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
|
|
|
+ // NM1 do 50 heartbeats
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+
|
|
|
+ SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
|
|
|
+
|
|
|
+ for (int i = 0; i < 50; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ }
|
|
|
+
|
|
|
+ // app1 gets all resource in partition=x
|
|
|
+ Assert.assertEquals(5, schedulerNode1.getNumContainers());
|
|
|
+
|
|
|
+ SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
|
|
|
+ .getNodeReport(nm1.getNodeId());
|
|
|
+ Assert.assertEquals(5 * GB, reportNm1.getUsedResource().getMemorySize());
|
|
|
+ Assert.assertEquals(5 * GB,
|
|
|
+ reportNm1.getAvailableResource().getMemorySize());
|
|
|
+
|
|
|
+ SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
|
|
|
+ .getNodeReport(nm2.getNodeId());
|
|
|
+ Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
|
|
|
+ Assert.assertEquals(10 * GB,
|
|
|
+ reportNm2.getAvailableResource().getMemorySize());
|
|
|
+
|
|
|
+ LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
|
|
|
+ assertEquals(0 * GB, leafQueue.getMetrics().getAvailableMB());
|
|
|
+ assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB());
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testQueueMetricsWithLabelsOnDefaultLabelNode() throws Exception {
|
|
|
+ /**
|
|
|
+ * Test case: have a following queue structure:
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * root
|
|
|
+ * / \
|
|
|
+ * a b
|
|
|
+ * (x) (x)
|
|
|
+ * </pre>
|
|
|
+ *
|
|
|
+ * a/b can access x, both of them has max-capacity-on-x = 50
|
|
|
+ *
|
|
|
+ * When doing non-exclusive allocation, app in a (or b) can use 100% of x
|
|
|
+ * resource.
|
|
|
+ */
|
|
|
+
|
|
|
+ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
|
|
|
+ this.conf);
|
|
|
+
|
|
|
+ // Define top-level queues
|
|
|
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
|
|
+ new String[] { "a", "b" });
|
|
|
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
|
|
+
|
|
|
+ final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
|
|
|
+ csConf.setCapacity(queueA, 25);
|
|
|
+ csConf.setAccessibleNodeLabels(queueA, toSet("x"));
|
|
|
+ csConf.setCapacityByLabel(queueA, "x", 50);
|
|
|
+ csConf.setMaximumCapacityByLabel(queueA, "x", 50);
|
|
|
+ final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
|
|
|
+ csConf.setCapacity(queueB, 75);
|
|
|
+ csConf.setAccessibleNodeLabels(queueB, toSet("x"));
|
|
|
+ csConf.setCapacityByLabel(queueB, "x", 50);
|
|
|
+ csConf.setMaximumCapacityByLabel(queueB, "x", 50);
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(
|
|
|
+ ImmutableSet.of(NodeLabel.newInstance("x", false)));
|
|
|
+ mgr.addLabelsToNode(
|
|
|
+ ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(csConf) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <no_label>
|
|
|
+ // app1 -> a
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
|
|
|
+
|
|
|
+ // app1 asks for 3 partition= containers
|
|
|
+ am1.allocate("*", 1 * GB, 3, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // NM1 do 50 heartbeats
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+
|
|
|
+ SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
|
|
|
+ for (int i = 0; i < 50; i++) {
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ }
|
|
|
+
|
|
|
+ // app1 gets all resource in partition=x (non-exclusive)
|
|
|
+ Assert.assertEquals(3, schedulerNode1.getNumContainers());
|
|
|
+
|
|
|
+ SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
|
|
|
+ .getNodeReport(nm1.getNodeId());
|
|
|
+ Assert.assertEquals(3 * GB, reportNm1.getUsedResource().getMemorySize());
|
|
|
+ Assert.assertEquals(7 * GB,
|
|
|
+ reportNm1.getAvailableResource().getMemorySize());
|
|
|
+
|
|
|
+ SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
|
|
|
+ .getNodeReport(nm2.getNodeId());
|
|
|
+ Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize());
|
|
|
+ Assert.assertEquals(9 * GB,
|
|
|
+ reportNm2.getAvailableResource().getMemorySize());
|
|
|
+
|
|
|
+ LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
|
|
|
+ double delta = 0.0001;
|
|
|
+ // 3GB is used from label x quota. 1.5 GB is remaining from default label.
|
|
|
+ // 2GB is remaining from label x.
|
|
|
+ assertEquals(3.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
|
|
|
+ assertEquals(4 * GB, leafQueue.getMetrics().getAllocatedMB());
|
|
|
+
|
|
|
+ // app1 asks for 1 default partition container
|
|
|
+ am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // NM2 do couple of heartbeats
|
|
|
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
+
|
|
|
+ SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+
|
|
|
+ // app1 gets all resource in default partition
|
|
|
+ Assert.assertEquals(2, schedulerNode2.getNumContainers());
|
|
|
+
|
|
|
+ // 3GB is used from label x quota. 2GB used from default label.
|
|
|
+ // So total 2.5 GB is remaining.
|
|
|
+ assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
|
|
|
+ assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB());
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
}
|