|
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|
|
import org.apache.hadoop.yarn.api.records.QueueState;
|
|
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
|
@@ -102,6 +103,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.CSQueueMetricsForCustomResources;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
@@ -109,6 +111,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
|
|
@@ -138,6 +141,7 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
|
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
|
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
+import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.apache.log4j.LogManager;
|
|
|
import org.apache.log4j.Logger;
|
|
@@ -209,6 +213,7 @@ public class TestCapacityScheduler {
|
|
|
|
|
|
@Before
|
|
|
public void setUp() throws Exception {
|
|
|
+ ResourceUtils.resetResourceTypes(new Configuration());
|
|
|
resourceManager = new ResourceManager() {
|
|
|
@Override
|
|
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
|
@@ -5081,23 +5086,109 @@ public class TestCapacityScheduler {
|
|
|
|
|
|
@Test
|
|
|
public void testCSQueueMetrics() throws Exception {
|
|
|
- CapacityScheduler cs = new CapacityScheduler();
|
|
|
- cs.setConf(new YarnConfiguration());
|
|
|
- cs.setRMContext(resourceManager.getRMContext());
|
|
|
- CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
|
|
- setupQueueConfiguration(conf);
|
|
|
- cs.init(conf);
|
|
|
- cs.start();
|
|
|
|
|
|
- RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(50 * GB), 1, "n1");
|
|
|
- RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(50 * GB), 2, "n2");
|
|
|
+ // Initialize resource map
|
|
|
+ Map<String, ResourceInformation> riMap = new HashMap<>();
|
|
|
+
|
|
|
+ // Initialize mandatory resources
|
|
|
+ ResourceInformation memory =
|
|
|
+ ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(),
|
|
|
+ ResourceInformation.MEMORY_MB.getUnits(),
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
|
|
+ ResourceInformation vcores =
|
|
|
+ ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
|
|
|
+ ResourceInformation.VCORES.getUnits(),
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
|
|
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
|
|
+ riMap.put(ResourceInformation.MEMORY_URI, memory);
|
|
|
+ riMap.put(ResourceInformation.VCORES_URI, vcores);
|
|
|
+ riMap.put(TestQueueMetricsForCustomResources.CUSTOM_RES_1,
|
|
|
+ ResourceInformation.newInstance(
|
|
|
+ TestQueueMetricsForCustomResources.CUSTOM_RES_1, "", 1, 10));
|
|
|
+
|
|
|
+ ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
|
|
+
|
|
|
+ CapacitySchedulerConfiguration csConf =
|
|
|
+ new CapacitySchedulerConfiguration();
|
|
|
+ csConf.setResourceComparator(DominantResourceCalculator.class);
|
|
|
+
|
|
|
+ csConf.set(YarnConfiguration.RESOURCE_TYPES,
|
|
|
+ TestQueueMetricsForCustomResources.CUSTOM_RES_1);
|
|
|
+
|
|
|
+ setupQueueConfiguration(csConf);
|
|
|
+
|
|
|
+ YarnConfiguration conf = new YarnConfiguration(csConf);
|
|
|
+
|
|
|
+ // Don't reset resource types since we have already configured resource
|
|
|
+ // types
|
|
|
+ conf.setBoolean(TestResourceUtils.TEST_CONF_RESET_RESOURCE_TYPES, false);
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
+ ResourceScheduler.class);
|
|
|
+
|
|
|
+ MockRM rm = new MockRM(conf);
|
|
|
+ rm.start();
|
|
|
+
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
|
|
+
|
|
|
+ RMNode n1 = MockNodes.newNodeInfo(0,
|
|
|
+ MockNodes.newResource(50 * GB, 50,
|
|
|
+ ImmutableMap.<String, String> builder()
|
|
|
+ .put(TestQueueMetricsForCustomResources.CUSTOM_RES_1,
|
|
|
+ String.valueOf(1000))
|
|
|
+ .build()),
|
|
|
+ 1, "n1");
|
|
|
+ RMNode n2 = MockNodes.newNodeInfo(0,
|
|
|
+ MockNodes.newResource(50 * GB, 50,
|
|
|
+ ImmutableMap.<String, String> builder()
|
|
|
+ .put(TestQueueMetricsForCustomResources.CUSTOM_RES_1,
|
|
|
+ String.valueOf(2000))
|
|
|
+ .build()),
|
|
|
+ 2, "n2");
|
|
|
cs.handle(new NodeAddedSchedulerEvent(n1));
|
|
|
cs.handle(new NodeAddedSchedulerEvent(n2));
|
|
|
|
|
|
+ Map<String, Long> guaranteedCapA11 =
|
|
|
+ ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a1")
|
|
|
+ .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
+ .getGuaranteedCapacity();
|
|
|
+ assertEquals(94, guaranteedCapA11
|
|
|
+ .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
+ Map<String, Long> maxCapA11 =
|
|
|
+ ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a1")
|
|
|
+ .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
+ .getMaxCapacity();
|
|
|
+ assertEquals(3000, maxCapA11
|
|
|
+ .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
+
|
|
|
assertEquals(10240, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getGuaranteedMB());
|
|
|
assertEquals(71680, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB());
|
|
|
assertEquals(102400, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB());
|
|
|
assertEquals(102400, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB());
|
|
|
+ Map<String, Long> guaranteedCapA =
|
|
|
+ ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
|
|
|
+ .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
+ .getGuaranteedCapacity();
|
|
|
+ assertEquals(314, guaranteedCapA
|
|
|
+ .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
+ Map<String, Long> maxCapA =
|
|
|
+ ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
|
|
|
+ .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
+ .getMaxCapacity();
|
|
|
+ assertEquals(3000, maxCapA
|
|
|
+ .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
+ Map<String, Long> guaranteedCapB1 =
|
|
|
+ ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
|
|
|
+ .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
+ .getGuaranteedCapacity();
|
|
|
+ assertEquals(2126, guaranteedCapB1
|
|
|
+ .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
+ Map<String, Long> maxCapB1 =
|
|
|
+ ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
|
|
|
+ .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
+ .getMaxCapacity();
|
|
|
+ assertEquals(3000, maxCapB1
|
|
|
+ .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
|
|
|
// Remove a node, metrics should be updated
|
|
|
cs.handle(new NodeRemovedSchedulerEvent(n2));
|
|
@@ -5105,6 +5196,31 @@ public class TestCapacityScheduler {
|
|
|
assertEquals(35840, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB());
|
|
|
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB());
|
|
|
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB());
|
|
|
+ Map<String, Long> guaranteedCapA1 =
|
|
|
+ ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
|
|
|
+ .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
+ .getGuaranteedCapacity();
|
|
|
+
|
|
|
+ assertEquals(104, guaranteedCapA1
|
|
|
+ .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
+ Map<String, Long> maxCapA1 =
|
|
|
+ ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
|
|
|
+ .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
+ .getMaxCapacity();
|
|
|
+ assertEquals(1000, maxCapA1
|
|
|
+ .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
+ Map<String, Long> guaranteedCapB11 =
|
|
|
+ ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
|
|
|
+ .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
+ .getGuaranteedCapacity();
|
|
|
+ assertEquals(708, guaranteedCapB11
|
|
|
+ .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
+ Map<String, Long> maxCapB11 =
|
|
|
+ ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
|
|
|
+ .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
+ .getMaxCapacity();
|
|
|
+ assertEquals(1000, maxCapB11
|
|
|
+ .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
assertEquals(A_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("a")
|
|
|
.getMetrics()).getGuaranteedCapacity(), DELTA);
|
|
|
assertEquals(A_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("a")
|
|
@@ -5123,20 +5239,49 @@ public class TestCapacityScheduler {
|
|
|
.getMaxAbsoluteCapacity(), DELTA);
|
|
|
|
|
|
// Add child queue to a, and reinitialize. Metrics should be updated
|
|
|
- conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a", new String[] {"a1", "a2", "a3"} );
|
|
|
- conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a2", 30.0f);
|
|
|
- conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 40.0f);
|
|
|
- conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 50.0f);
|
|
|
-
|
|
|
- cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
|
|
|
- null, new RMContainerTokenSecretManager(conf),
|
|
|
- new NMTokenSecretManagerInRM(conf),
|
|
|
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a",
|
|
|
+ new String[] {"a1", "a2", "a3"});
|
|
|
+ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a2", 29.5f);
|
|
|
+ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 40.5f);
|
|
|
+ csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3",
|
|
|
+ 50.0f);
|
|
|
+
|
|
|
+ cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, null,
|
|
|
+ null, new RMContainerTokenSecretManager(csConf),
|
|
|
+ new NMTokenSecretManagerInRM(csConf),
|
|
|
new ClientToAMTokenSecretManagerInRM(), null));
|
|
|
|
|
|
assertEquals(1024, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getGuaranteedMB());
|
|
|
assertEquals(2048, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getGuaranteedMB());
|
|
|
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getMaxCapacityMB());
|
|
|
assertEquals(25600, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getMaxCapacityMB());
|
|
|
+
|
|
|
+ Map<String, Long> guaranteedCapA2 =
|
|
|
+ ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a2")
|
|
|
+ .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
+ .getGuaranteedCapacity();
|
|
|
+ assertEquals(30, guaranteedCapA2
|
|
|
+ .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
+ Map<String, Long> maxCapA2 =
|
|
|
+ ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a2")
|
|
|
+ .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
+ .getMaxCapacity();
|
|
|
+ assertEquals(1000, maxCapA2
|
|
|
+ .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
+
|
|
|
+ Map<String, Long> guaranteedCapA3 =
|
|
|
+ ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a3")
|
|
|
+ .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
+ .getGuaranteedCapacity();
|
|
|
+ assertEquals(42, guaranteedCapA3
|
|
|
+ .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
+ Map<String, Long> maxCapA3 =
|
|
|
+ ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a3")
|
|
|
+ .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
+ .getMaxCapacity();
|
|
|
+ assertEquals(500, maxCapA3
|
|
|
+ .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
+ rm.stop();
|
|
|
}
|
|
|
|
|
|
@Test
|