|
@@ -33,9 +33,11 @@ import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
|
|
+import java.util.Enumeration;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.PriorityQueue;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.concurrent.BrokenBarrierException;
|
|
import java.util.concurrent.BrokenBarrierException;
|
|
import java.util.concurrent.CyclicBarrier;
|
|
import java.util.concurrent.CyclicBarrier;
|
|
@@ -50,6 +52,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
|
+import org.apache.hadoop.util.Time;
|
|
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
|
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
@@ -90,7 +93,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.Application;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.Application;
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
|
@@ -156,8 +158,12 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
|
+import org.apache.log4j.Level;
|
|
|
|
+import org.apache.log4j.LogManager;
|
|
|
|
+import org.apache.log4j.Logger;
|
|
import org.junit.After;
|
|
import org.junit.After;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
|
|
+import org.junit.Assume;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
import org.mockito.Mockito;
|
|
import org.mockito.Mockito;
|
|
@@ -3492,6 +3498,7 @@ public class TestCapacityScheduler {
|
|
rm.stop();
|
|
rm.stop();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testHeadRoomCalculationWithDRC() throws Exception {
|
|
public void testHeadRoomCalculationWithDRC() throws Exception {
|
|
// test with total cluster resource of 20GB memory and 20 vcores.
|
|
// test with total cluster resource of 20GB memory and 20 vcores.
|
|
@@ -4074,6 +4081,143 @@ public class TestCapacityScheduler {
|
|
rm.stop();
|
|
rm.stop();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test (timeout = 300000)
|
|
|
|
+ public void testUserLimitThroughput() throws Exception {
|
|
|
|
+ // Since this is more of a performance unit test, only run if
|
|
|
|
+ // RunUserLimitThroughput is set (-DRunUserLimitThroughput=true)
|
|
|
|
+ Assume.assumeTrue(Boolean.valueOf(
|
|
|
|
+ System.getProperty("RunUserLimitThroughput")));
|
|
|
|
+
|
|
|
|
+ CapacitySchedulerConfiguration csconf =
|
|
|
|
+ new CapacitySchedulerConfiguration();
|
|
|
|
+ csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
|
|
|
|
+ csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
|
|
|
|
+ csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
|
|
|
|
+ 100.0f);
|
|
|
|
+ csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
|
|
|
|
+ csconf.setResourceComparator(DominantResourceCalculator.class);
|
|
|
|
+
|
|
|
|
+ YarnConfiguration conf = new YarnConfiguration(csconf);
|
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
|
+ ResourceScheduler.class);
|
|
|
|
+
|
|
|
|
+ MockRM rm = new MockRM(conf);
|
|
|
|
+ rm.start();
|
|
|
|
+
|
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
|
|
|
+ LeafQueue qb = (LeafQueue)cs.getQueue("default");
|
|
|
|
+
|
|
|
|
+ // For now make user limit large so we can activate all applications
|
|
|
|
+ qb.setUserLimitFactor((float)100.0);
|
|
|
|
+ qb.setupConfigurableCapacities();
|
|
|
|
+
|
|
|
|
+ SchedulerEvent addAppEvent;
|
|
|
|
+ SchedulerEvent addAttemptEvent;
|
|
|
|
+ Container container = mock(Container.class);
|
|
|
|
+ ApplicationSubmissionContext submissionContext =
|
|
|
|
+ mock(ApplicationSubmissionContext.class);
|
|
|
|
+
|
|
|
|
+ final int appCount = 100;
|
|
|
|
+ ApplicationId[] appids = new ApplicationId[appCount];
|
|
|
|
+ RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount];
|
|
|
|
+ ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount];
|
|
|
|
+ RMAppImpl[] apps = new RMAppImpl[appCount];
|
|
|
|
+ RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount];
|
|
|
|
+ for (int i=0; i<appCount; i++) {
|
|
|
|
+ appids[i] = BuilderUtils.newApplicationId(100, i);
|
|
|
|
+ appAttemptIds[i] =
|
|
|
|
+ BuilderUtils.newApplicationAttemptId(appids[i], 1);
|
|
|
|
+
|
|
|
|
+ attemptMetrics[i] =
|
|
|
|
+ new RMAppAttemptMetrics(appAttemptIds[i], rm.getRMContext());
|
|
|
|
+ apps[i] = mock(RMAppImpl.class);
|
|
|
|
+ when(apps[i].getApplicationId()).thenReturn(appids[i]);
|
|
|
|
+ attempts[i] = mock(RMAppAttemptImpl.class);
|
|
|
|
+ when(attempts[i].getMasterContainer()).thenReturn(container);
|
|
|
|
+ when(attempts[i].getSubmissionContext()).thenReturn(submissionContext);
|
|
|
|
+ when(attempts[i].getAppAttemptId()).thenReturn(appAttemptIds[i]);
|
|
|
|
+ when(attempts[i].getRMAppAttemptMetrics()).thenReturn(attemptMetrics[i]);
|
|
|
|
+ when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]);
|
|
|
|
+
|
|
|
|
+ rm.getRMContext().getRMApps().put(appids[i], apps[i]);
|
|
|
|
+ addAppEvent =
|
|
|
|
+ new AppAddedSchedulerEvent(appids[i], "default", "user1");
|
|
|
|
+ cs.handle(addAppEvent);
|
|
|
|
+ addAttemptEvent =
|
|
|
|
+ new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false);
|
|
|
|
+ cs.handle(addAttemptEvent);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // add nodes to cluster, so cluster has 20GB and 20 vcores
|
|
|
|
+ Resource newResource = Resource.newInstance(10 * GB, 10);
|
|
|
|
+ RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
|
|
|
|
+ cs.handle(new NodeAddedSchedulerEvent(node));
|
|
|
|
+
|
|
|
|
+ Resource newResource2 = Resource.newInstance(10 * GB, 10);
|
|
|
|
+ RMNode node2 = MockNodes.newNodeInfo(0, newResource2, 1, "127.0.0.2");
|
|
|
|
+ cs.handle(new NodeAddedSchedulerEvent(node2));
|
|
|
|
+
|
|
|
|
+ Priority u0Priority = TestUtils.createMockPriority(1);
|
|
|
|
+ RecordFactory recordFactory =
|
|
|
|
+ RecordFactoryProvider.getRecordFactory(null);
|
|
|
|
+
|
|
|
|
+ FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[appCount];
|
|
|
|
+ for (int i=0;i<appCount;i++) {
|
|
|
|
+ fiCaApps[i] =
|
|
|
|
+ cs.getSchedulerApplications().get(apps[i].getApplicationId())
|
|
|
|
+ .getCurrentAppAttempt();
|
|
|
|
+ // allocate container for app2 with 1GB memory and 1 vcore
|
|
|
|
+ fiCaApps[i].updateResourceRequests(Collections.singletonList(
|
|
|
|
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
|
|
|
|
+ u0Priority, recordFactory)));
|
|
|
|
+ }
|
|
|
|
+ // Now force everything to be over user limit
|
|
|
|
+ qb.setUserLimitFactor((float)0.0);
|
|
|
|
+
|
|
|
|
+ // Quiet the loggers while measuring throughput
|
|
|
|
+ for (Enumeration<?> loggers=LogManager.getCurrentLoggers();
|
|
|
|
+ loggers.hasMoreElements(); ) {
|
|
|
|
+ Logger logger = (Logger) loggers.nextElement();
|
|
|
|
+ logger.setLevel(Level.WARN);
|
|
|
|
+ }
|
|
|
|
+ final int topn = 20;
|
|
|
|
+ final int iterations = 2000000;
|
|
|
|
+ final int printInterval = 20000;
|
|
|
|
+ final float numerator = 1000.0f * printInterval;
|
|
|
|
+ PriorityQueue<Long> queue = new PriorityQueue<>(topn,
|
|
|
|
+ Collections.reverseOrder());
|
|
|
|
+
|
|
|
|
+ long n = Time.monotonicNow();
|
|
|
|
+ long timespent = 0;
|
|
|
|
+ for (int i = 0; i < iterations; i+=2) {
|
|
|
|
+ if (i > 0 && i % printInterval == 0){
|
|
|
|
+ long ts = (Time.monotonicNow() - n);
|
|
|
|
+ if (queue.size() < topn) {
|
|
|
|
+ queue.offer(ts);
|
|
|
|
+ } else {
|
|
|
|
+ Long last = queue.peek();
|
|
|
|
+ if (last > ts) {
|
|
|
|
+ queue.poll();
|
|
|
|
+ queue.offer(ts);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ System.out.println(i + " " + (numerator / ts));
|
|
|
|
+ n= Time.monotonicNow();
|
|
|
|
+ }
|
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(node));
|
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(node2));
|
|
|
|
+ }
|
|
|
|
+ timespent=0;
|
|
|
|
+ int entries = queue.size();
|
|
|
|
+ while(queue.size() > 0){
|
|
|
|
+ long l = queue.poll();
|
|
|
|
+ timespent += l;
|
|
|
|
+ }
|
|
|
|
+ System.out.println("Avg of fastest " + entries + ": "
|
|
|
|
+ + numerator / (timespent / entries));
|
|
|
|
+ rm.stop();
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testCSQueueBlocked() throws Exception {
|
|
public void testCSQueueBlocked() throws Exception {
|
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|