|
@@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|
|
import org.apache.hadoop.yarn.api.records.QueueState;
|
|
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
-import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
|
@@ -109,7 +108,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
|
|
|
+
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
|
|
@@ -128,7 +127,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdate
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.CSQueueMetricsForCustomResources;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
@@ -136,7 +134,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState;
|
|
@@ -193,7 +190,6 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
|
|
|
|
|
@Before
|
|
|
public void setUp() throws Exception {
|
|
|
- ResourceUtils.resetResourceTypes(new Configuration());
|
|
|
resourceManager = new ResourceManager() {
|
|
|
@Override
|
|
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
|
@@ -5182,109 +5178,23 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
|
|
|
|
|
@Test
|
|
|
public void testCSQueueMetrics() throws Exception {
|
|
|
+ CapacityScheduler cs = new CapacityScheduler();
|
|
|
+ cs.setConf(new YarnConfiguration());
|
|
|
+ cs.setRMContext(resourceManager.getRMContext());
|
|
|
+ CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
|
|
+ setupQueueConfiguration(conf);
|
|
|
+ cs.init(conf);
|
|
|
+ cs.start();
|
|
|
|
|
|
- // Initialize resource map
|
|
|
- Map<String, ResourceInformation> riMap = new HashMap<>();
|
|
|
-
|
|
|
- // Initialize mandatory resources
|
|
|
- ResourceInformation memory =
|
|
|
- ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(),
|
|
|
- ResourceInformation.MEMORY_MB.getUnits(),
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
|
|
- ResourceInformation vcores =
|
|
|
- ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
|
|
|
- ResourceInformation.VCORES.getUnits(),
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
|
|
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
|
|
- riMap.put(ResourceInformation.MEMORY_URI, memory);
|
|
|
- riMap.put(ResourceInformation.VCORES_URI, vcores);
|
|
|
- riMap.put(TestQueueMetricsForCustomResources.CUSTOM_RES_1,
|
|
|
- ResourceInformation.newInstance(
|
|
|
- TestQueueMetricsForCustomResources.CUSTOM_RES_1, "", 1, 10));
|
|
|
-
|
|
|
- ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
|
|
-
|
|
|
- CapacitySchedulerConfiguration csConf =
|
|
|
- new CapacitySchedulerConfiguration();
|
|
|
- csConf.setResourceComparator(DominantResourceCalculator.class);
|
|
|
-
|
|
|
- csConf.set(YarnConfiguration.RESOURCE_TYPES,
|
|
|
- TestQueueMetricsForCustomResources.CUSTOM_RES_1);
|
|
|
-
|
|
|
- setupQueueConfiguration(csConf);
|
|
|
-
|
|
|
- YarnConfiguration conf = new YarnConfiguration(csConf);
|
|
|
-
|
|
|
- // Don't reset resource types since we have already configured resource
|
|
|
- // types
|
|
|
- conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false);
|
|
|
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
- ResourceScheduler.class);
|
|
|
-
|
|
|
- MockRM rm = new MockRM(conf);
|
|
|
- rm.start();
|
|
|
-
|
|
|
- CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
|
|
-
|
|
|
- RMNode n1 = MockNodes.newNodeInfo(0,
|
|
|
- MockNodes.newResource(50 * GB, 50,
|
|
|
- ImmutableMap.<String, String> builder()
|
|
|
- .put(TestQueueMetricsForCustomResources.CUSTOM_RES_1,
|
|
|
- String.valueOf(1000))
|
|
|
- .build()),
|
|
|
- 1, "n1");
|
|
|
- RMNode n2 = MockNodes.newNodeInfo(0,
|
|
|
- MockNodes.newResource(50 * GB, 50,
|
|
|
- ImmutableMap.<String, String> builder()
|
|
|
- .put(TestQueueMetricsForCustomResources.CUSTOM_RES_1,
|
|
|
- String.valueOf(2000))
|
|
|
- .build()),
|
|
|
- 2, "n2");
|
|
|
+ RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(50 * GB), 1, "n1");
|
|
|
+ RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(50 * GB), 2, "n2");
|
|
|
cs.handle(new NodeAddedSchedulerEvent(n1));
|
|
|
cs.handle(new NodeAddedSchedulerEvent(n2));
|
|
|
|
|
|
- Map<String, Long> guaranteedCapA11 =
|
|
|
- ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a1")
|
|
|
- .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
- .getGuaranteedCapacity();
|
|
|
- assertEquals(94, guaranteedCapA11
|
|
|
- .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
- Map<String, Long> maxCapA11 =
|
|
|
- ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a1")
|
|
|
- .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
- .getMaxCapacity();
|
|
|
- assertEquals(3000, maxCapA11
|
|
|
- .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
-
|
|
|
assertEquals(10240, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getGuaranteedMB());
|
|
|
assertEquals(71680, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB());
|
|
|
assertEquals(102400, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB());
|
|
|
assertEquals(102400, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB());
|
|
|
- Map<String, Long> guaranteedCapA =
|
|
|
- ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
|
|
|
- .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
- .getGuaranteedCapacity();
|
|
|
- assertEquals(314, guaranteedCapA
|
|
|
- .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
- Map<String, Long> maxCapA =
|
|
|
- ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
|
|
|
- .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
- .getMaxCapacity();
|
|
|
- assertEquals(3000, maxCapA
|
|
|
- .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
- Map<String, Long> guaranteedCapB1 =
|
|
|
- ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
|
|
|
- .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
- .getGuaranteedCapacity();
|
|
|
- assertEquals(2126, guaranteedCapB1
|
|
|
- .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
- Map<String, Long> maxCapB1 =
|
|
|
- ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
|
|
|
- .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
- .getMaxCapacity();
|
|
|
- assertEquals(3000, maxCapB1
|
|
|
- .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
|
|
|
// Remove a node, metrics should be updated
|
|
|
cs.handle(new NodeRemovedSchedulerEvent(n2));
|
|
@@ -5292,31 +5202,6 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
|
|
assertEquals(35840, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB());
|
|
|
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB());
|
|
|
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB());
|
|
|
- Map<String, Long> guaranteedCapA1 =
|
|
|
- ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
|
|
|
- .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
- .getGuaranteedCapacity();
|
|
|
-
|
|
|
- assertEquals(104, guaranteedCapA1
|
|
|
- .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
- Map<String, Long> maxCapA1 =
|
|
|
- ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
|
|
|
- .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
- .getMaxCapacity();
|
|
|
- assertEquals(1000, maxCapA1
|
|
|
- .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
- Map<String, Long> guaranteedCapB11 =
|
|
|
- ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
|
|
|
- .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
- .getGuaranteedCapacity();
|
|
|
- assertEquals(708, guaranteedCapB11
|
|
|
- .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
- Map<String, Long> maxCapB11 =
|
|
|
- ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
|
|
|
- .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
- .getMaxCapacity();
|
|
|
- assertEquals(1000, maxCapB11
|
|
|
- .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
assertEquals(A_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("a")
|
|
|
.getMetrics()).getGuaranteedCapacity(), DELTA);
|
|
|
assertEquals(A_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("a")
|
|
@@ -5335,49 +5220,20 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
|
|
.getMaxAbsoluteCapacity(), DELTA);
|
|
|
|
|
|
// Add child queue to a, and reinitialize. Metrics should be updated
|
|
|
- csConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a",
|
|
|
- new String[] {"a1", "a2", "a3"});
|
|
|
- csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a2", 29.5f);
|
|
|
- csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 40.5f);
|
|
|
- csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3",
|
|
|
- 50.0f);
|
|
|
-
|
|
|
- cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, null,
|
|
|
- null, new RMContainerTokenSecretManager(csConf),
|
|
|
- new NMTokenSecretManagerInRM(csConf),
|
|
|
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a", new String[] {"a1", "a2", "a3"} );
|
|
|
+ conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a2", 30.0f);
|
|
|
+ conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 40.0f);
|
|
|
+ conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 50.0f);
|
|
|
+
|
|
|
+ cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
|
|
|
+ null, new RMContainerTokenSecretManager(conf),
|
|
|
+ new NMTokenSecretManagerInRM(conf),
|
|
|
new ClientToAMTokenSecretManagerInRM(), null));
|
|
|
|
|
|
assertEquals(1024, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getGuaranteedMB());
|
|
|
assertEquals(2048, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getGuaranteedMB());
|
|
|
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getMaxCapacityMB());
|
|
|
assertEquals(25600, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getMaxCapacityMB());
|
|
|
-
|
|
|
- Map<String, Long> guaranteedCapA2 =
|
|
|
- ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a2")
|
|
|
- .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
- .getGuaranteedCapacity();
|
|
|
- assertEquals(30, guaranteedCapA2
|
|
|
- .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
- Map<String, Long> maxCapA2 =
|
|
|
- ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a2")
|
|
|
- .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
- .getMaxCapacity();
|
|
|
- assertEquals(1000, maxCapA2
|
|
|
- .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
-
|
|
|
- Map<String, Long> guaranteedCapA3 =
|
|
|
- ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a3")
|
|
|
- .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
- .getGuaranteedCapacity();
|
|
|
- assertEquals(42, guaranteedCapA3
|
|
|
- .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
- Map<String, Long> maxCapA3 =
|
|
|
- ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a3")
|
|
|
- .getMetrics()).getQueueMetricsForCustomResources())
|
|
|
- .getMaxCapacity();
|
|
|
- assertEquals(500, maxCapA3
|
|
|
- .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
|
|
- rm.stop();
|
|
|
}
|
|
|
|
|
|
@Test
|