|
@@ -0,0 +1,265 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|
|
+
|
|
|
+import org.apache.hadoop.util.Time;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
|
+import org.apache.hadoop.yarn.api.records.Container;
|
|
|
+import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
|
|
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
|
|
+import org.apache.log4j.Level;
|
|
|
+import org.apache.log4j.LogManager;
|
|
|
+import org.apache.log4j.Logger;
|
|
|
+import org.junit.Assume;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Enumeration;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.PriorityQueue;
|
|
|
+
|
|
|
+import static org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES;
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
+
|
|
|
+public class TestCapacitySchedulerPerf {
|
|
|
+ private final int GB = 1024;
|
|
|
+
|
|
|
+ private String getResourceName(int idx) {
|
|
|
+ return "resource-" + idx;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void testUserLimitThroughputWithNumberOfResourceTypes(
|
|
|
+ int numOfResourceTypes)
|
|
|
+ throws Exception {
|
|
|
+ if (numOfResourceTypes > 2) {
|
|
|
+ // Initialize resource map
|
|
|
+ Map<String, ResourceInformation> riMap = new HashMap<>();
|
|
|
+
|
|
|
+ // Initialize mandatory resources
|
|
|
+ riMap.put(ResourceInformation.MEMORY_URI, ResourceInformation.MEMORY_MB);
|
|
|
+ riMap.put(ResourceInformation.VCORES_URI, ResourceInformation.VCORES);
|
|
|
+
|
|
|
+ for (int i = 2; i < numOfResourceTypes; i++) {
|
|
|
+ String resourceName = getResourceName(i);
|
|
|
+ riMap.put(resourceName, ResourceInformation
|
|
|
+ .newInstance(resourceName, "", 0, ResourceTypes.COUNTABLE, 0,
|
|
|
+ Integer.MAX_VALUE));
|
|
|
+ }
|
|
|
+
|
|
|
+ ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Since this is more of a performance unit test, only run if
|
|
|
+ // RunUserLimitThroughput is set (-DRunUserLimitThroughput=true)
|
|
|
+ Assume.assumeTrue(Boolean.valueOf(
|
|
|
+ System.getProperty("RunCapacitySchedulerPerfTests")));
|
|
|
+
|
|
|
+ CapacitySchedulerConfiguration csconf =
|
|
|
+ new CapacitySchedulerConfiguration();
|
|
|
+ csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
|
|
|
+ csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
|
|
|
+ csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
|
|
|
+ 100.0f);
|
|
|
+ csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
|
|
|
+ csconf.setResourceComparator(DominantResourceCalculator.class);
|
|
|
+
|
|
|
+ YarnConfiguration conf = new YarnConfiguration(csconf);
|
|
|
+ // Don't reset resource types since we have already configured resource types
|
|
|
+ conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false);
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
+ ResourceScheduler.class);
|
|
|
+
|
|
|
+ MockRM rm = new MockRM(conf);
|
|
|
+ rm.start();
|
|
|
+
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
|
|
+ LeafQueue qb = (LeafQueue)cs.getQueue("default");
|
|
|
+
|
|
|
+ // For now make user limit large so we can activate all applications
|
|
|
+ qb.setUserLimitFactor((float)100.0);
|
|
|
+ qb.setupConfigurableCapacities();
|
|
|
+
|
|
|
+ SchedulerEvent addAppEvent;
|
|
|
+ SchedulerEvent addAttemptEvent;
|
|
|
+ Container container = mock(Container.class);
|
|
|
+ ApplicationSubmissionContext submissionContext =
|
|
|
+ mock(ApplicationSubmissionContext.class);
|
|
|
+
|
|
|
+ final int appCount = 100;
|
|
|
+ ApplicationId[] appids = new ApplicationId[appCount];
|
|
|
+ RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount];
|
|
|
+ ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount];
|
|
|
+ RMAppImpl[] apps = new RMAppImpl[appCount];
|
|
|
+ RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount];
|
|
|
+ for (int i=0; i<appCount; i++) {
|
|
|
+ appids[i] = BuilderUtils.newApplicationId(100, i);
|
|
|
+ appAttemptIds[i] =
|
|
|
+ BuilderUtils.newApplicationAttemptId(appids[i], 1);
|
|
|
+
|
|
|
+ attemptMetrics[i] =
|
|
|
+ new RMAppAttemptMetrics(appAttemptIds[i], rm.getRMContext());
|
|
|
+ apps[i] = mock(RMAppImpl.class);
|
|
|
+ when(apps[i].getApplicationId()).thenReturn(appids[i]);
|
|
|
+ attempts[i] = mock(RMAppAttemptImpl.class);
|
|
|
+ when(attempts[i].getMasterContainer()).thenReturn(container);
|
|
|
+ when(attempts[i].getSubmissionContext()).thenReturn(submissionContext);
|
|
|
+ when(attempts[i].getAppAttemptId()).thenReturn(appAttemptIds[i]);
|
|
|
+ when(attempts[i].getRMAppAttemptMetrics()).thenReturn(attemptMetrics[i]);
|
|
|
+ when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]);
|
|
|
+
|
|
|
+ rm.getRMContext().getRMApps().put(appids[i], apps[i]);
|
|
|
+ addAppEvent =
|
|
|
+ new AppAddedSchedulerEvent(appids[i], "default", "user1");
|
|
|
+ cs.handle(addAppEvent);
|
|
|
+ addAttemptEvent =
|
|
|
+ new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false);
|
|
|
+ cs.handle(addAttemptEvent);
|
|
|
+ }
|
|
|
+
|
|
|
+ // add nodes to cluster, so cluster has 20GB and 20 vcores
|
|
|
+ Resource nodeResource = Resource.newInstance(10 * GB, 10);
|
|
|
+ if (numOfResourceTypes > 2) {
|
|
|
+ for (int i = 2; i < numOfResourceTypes; i++) {
|
|
|
+ nodeResource.setResourceValue(getResourceName(i), 10);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ RMNode node = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.1");
|
|
|
+ cs.handle(new NodeAddedSchedulerEvent(node));
|
|
|
+
|
|
|
+ RMNode node2 = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.2");
|
|
|
+ cs.handle(new NodeAddedSchedulerEvent(node2));
|
|
|
+
|
|
|
+ Priority u0Priority = TestUtils.createMockPriority(1);
|
|
|
+ RecordFactory recordFactory =
|
|
|
+ RecordFactoryProvider.getRecordFactory(null);
|
|
|
+
|
|
|
+ FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[appCount];
|
|
|
+ for (int i=0;i<appCount;i++) {
|
|
|
+ fiCaApps[i] =
|
|
|
+ cs.getSchedulerApplications().get(apps[i].getApplicationId())
|
|
|
+ .getCurrentAppAttempt();
|
|
|
+
|
|
|
+ ResourceRequest resourceRequest = TestUtils.createResourceRequest(
|
|
|
+ ResourceRequest.ANY, 1 * GB, 1, true, u0Priority, recordFactory);
|
|
|
+ if (numOfResourceTypes > 2) {
|
|
|
+ for (int j = 2; j < numOfResourceTypes; j++) {
|
|
|
+ resourceRequest.getCapability().setResourceValue(getResourceName(j),
|
|
|
+ 10);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // allocate container for app2 with 1GB memory and 1 vcore
|
|
|
+ fiCaApps[i].updateResourceRequests(
|
|
|
+ Collections.singletonList(resourceRequest));
|
|
|
+ }
|
|
|
+ // Now force everything to be over user limit
|
|
|
+ qb.setUserLimitFactor((float)0.0);
|
|
|
+
|
|
|
+ // Quiet the loggers while measuring throughput
|
|
|
+ for (Enumeration<?> loggers = LogManager.getCurrentLoggers();
|
|
|
+ loggers.hasMoreElements(); ) {
|
|
|
+ Logger logger = (Logger) loggers.nextElement();
|
|
|
+ logger.setLevel(Level.WARN);
|
|
|
+ }
|
|
|
+ final int topn = 20;
|
|
|
+ final int iterations = 2000000;
|
|
|
+ final int printInterval = 20000;
|
|
|
+ final float numerator = 1000.0f * printInterval;
|
|
|
+ PriorityQueue<Long> queue = new PriorityQueue<>(topn,
|
|
|
+ Collections.reverseOrder());
|
|
|
+
|
|
|
+ long n = Time.monotonicNow();
|
|
|
+ long timespent = 0;
|
|
|
+ for (int i = 0; i < iterations; i+=2) {
|
|
|
+ if (i > 0 && i % printInterval == 0){
|
|
|
+ long ts = (Time.monotonicNow() - n);
|
|
|
+ if (queue.size() < topn) {
|
|
|
+ queue.offer(ts);
|
|
|
+ } else {
|
|
|
+ Long last = queue.peek();
|
|
|
+ if (last > ts) {
|
|
|
+ queue.poll();
|
|
|
+ queue.offer(ts);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ System.out.println(i + " " + (numerator / ts));
|
|
|
+ n= Time.monotonicNow();
|
|
|
+ }
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(node));
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(node2));
|
|
|
+ }
|
|
|
+ timespent=0;
|
|
|
+ int entries = queue.size();
|
|
|
+ while(queue.size() > 0){
|
|
|
+ long l = queue.poll();
|
|
|
+ timespent += l;
|
|
|
+ }
|
|
|
+ System.out.println(
|
|
|
+ "#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries
|
|
|
+ + ": " + numerator / (timespent / entries));
|
|
|
+ rm.stop();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 300000)
|
|
|
+ public void testUserLimitThroughputForTwoResources() throws Exception {
|
|
|
+ testUserLimitThroughputWithNumberOfResourceTypes(2);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 300000)
|
|
|
+ public void testUserLimitThroughputForThreeResources() throws Exception {
|
|
|
+ testUserLimitThroughputWithNumberOfResourceTypes(3);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 300000)
|
|
|
+ public void testUserLimitThroughputForFourResources() throws Exception {
|
|
|
+ testUserLimitThroughputWithNumberOfResourceTypes(4);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 300000)
|
|
|
+ public void testUserLimitThroughputForFiveResources() throws Exception {
|
|
|
+ testUserLimitThroughputWithNumberOfResourceTypes(5);
|
|
|
+ }
|
|
|
+}
|