|
@@ -23,7 +23,9 @@ import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
@@ -36,6 +38,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
|
@@ -307,4 +312,259 @@ public class TestApplicationPriority {
|
|
|
maxPriority);
|
|
|
rm.stop();
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testUpdatePriorityAtRuntime() throws Exception {
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
+ ResourceScheduler.class);
|
|
|
+ // Set Max Application Priority as 10
|
|
|
+ conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
|
|
|
+ MockRM rm = new MockRM(conf);
|
|
|
+ rm.start();
|
|
|
+
|
|
|
+ Priority appPriority1 = Priority.newInstance(5);
|
|
|
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB);
|
|
|
+ RMApp app1 = rm.submitApp(1 * GB, appPriority1);
|
|
|
+
|
|
|
+ // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1
|
|
|
+ MockAM am1 = MockRM.launchAM(app1, rm, nm1);
|
|
|
+ am1.registerAppAttempt();
|
|
|
+
|
|
|
+ // get scheduler
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
|
|
+
|
|
|
+ // Change the priority of App1 to 8
|
|
|
+ Priority appPriority2 = Priority.newInstance(8);
|
|
|
+ cs.updateApplicationPriority(appPriority2, app1.getApplicationId());
|
|
|
+
|
|
|
+ // get scheduler app
|
|
|
+ FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
|
|
|
+ .get(app1.getApplicationId()).getCurrentAppAttempt();
|
|
|
+
|
|
|
+ // Verify whether the new priority is updated
|
|
|
+ Assert.assertEquals(appPriority2, schedulerAppAttempt.getPriority());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testUpdateInvalidPriorityAtRuntime() throws Exception {
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
+ ResourceScheduler.class);
|
|
|
+ // Set Max Application Priority as 10
|
|
|
+ conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
|
|
|
+ MockRM rm = new MockRM(conf);
|
|
|
+ rm.start();
|
|
|
+
|
|
|
+ Priority appPriority1 = Priority.newInstance(5);
|
|
|
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB);
|
|
|
+ RMApp app1 = rm.submitApp(1 * GB, appPriority1);
|
|
|
+
|
|
|
+ // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1
|
|
|
+ MockAM am1 = MockRM.launchAM(app1, rm, nm1);
|
|
|
+ am1.registerAppAttempt();
|
|
|
+
|
|
|
+ // get scheduler
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
|
|
+
|
|
|
+ // Change the priority of App1 to 15
|
|
|
+ Priority appPriority2 = Priority.newInstance(15);
|
|
|
+ cs.updateApplicationPriority(appPriority2, app1.getApplicationId());
|
|
|
+
|
|
|
+ // get scheduler app
|
|
|
+ FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
|
|
|
+ .get(app1.getApplicationId()).getCurrentAppAttempt();
|
|
|
+
|
|
|
+ // Verify whether priority 15 is reset to 10
|
|
|
+ Priority appPriority3 = Priority.newInstance(10);
|
|
|
+ Assert.assertEquals(appPriority3, schedulerAppAttempt.getPriority());
|
|
|
+ rm.stop();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 180000)
|
|
|
+ public void testRMRestartWithChangeInPriority() throws Exception {
|
|
|
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
|
|
|
+ false);
|
|
|
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
|
|
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
|
|
+ conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
|
|
|
+
|
|
|
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
+ memStore.init(conf);
|
|
|
+ RMState rmState = memStore.getState();
|
|
|
+ Map<ApplicationId, ApplicationStateData> rmAppState = rmState
|
|
|
+ .getApplicationState();
|
|
|
+
|
|
|
+ // PHASE 1: create state in an RM
|
|
|
+
|
|
|
+ // start RM
|
|
|
+ MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ rm1.start();
|
|
|
+
|
|
|
+ MockNM nm1 = new MockNM("127.0.0.1:1234", 15120,
|
|
|
+ rm1.getResourceTrackerService());
|
|
|
+ nm1.registerNode();
|
|
|
+
|
|
|
+ Priority appPriority1 = Priority.newInstance(5);
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, appPriority1);
|
|
|
+
|
|
|
+ // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1
|
|
|
+ MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
|
|
|
+ am1.registerAppAttempt();
|
|
|
+
|
|
|
+ // get scheduler
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+
|
|
|
+ // Change the priority of App1 to 8
|
|
|
+ Priority appPriority2 = Priority.newInstance(8);
|
|
|
+ cs.updateApplicationPriority(appPriority2, app1.getApplicationId());
|
|
|
+
|
|
|
+ // let things settle down
|
|
|
+ Thread.sleep(1000);
|
|
|
+
|
|
|
+ // create new RM to represent restart and recover state
|
|
|
+ MockRM rm2 = new MockRM(conf, memStore);
|
|
|
+
|
|
|
+ // start new RM
|
|
|
+ rm2.start();
|
|
|
+ // change NM to point to new RM
|
|
|
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
|
|
+
|
|
|
+ // Verify RM Apps after this restart
|
|
|
+ Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
|
|
|
+
|
|
|
+ // get scheduler app
|
|
|
+ RMApp loadedApp = rm2.getRMContext().getRMApps()
|
|
|
+ .get(app1.getApplicationId());
|
|
|
+
|
|
|
+ // Verify whether priority 15 is reset to 10
|
|
|
+ Assert.assertEquals(appPriority2, loadedApp.getCurrentAppAttempt()
|
|
|
+ .getSubmissionContext().getPriority());
|
|
|
+
|
|
|
+ rm2.stop();
|
|
|
+ rm1.stop();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testApplicationPriorityAllocationWithChangeInPriority()
|
|
|
+ throws Exception {
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
+ ResourceScheduler.class);
|
|
|
+ // Set Max Application Priority as 10
|
|
|
+ conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
|
|
|
+ MockRM rm = new MockRM(conf);
|
|
|
+ rm.start();
|
|
|
+
|
|
|
+ Priority appPriority1 = Priority.newInstance(5);
|
|
|
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB);
|
|
|
+ RMApp app1 = rm.submitApp(1 * GB, appPriority1);
|
|
|
+
|
|
|
+ // kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1
|
|
|
+ MockAM am1 = MockRM.launchAM(app1, rm, nm1);
|
|
|
+ am1.registerAppAttempt();
|
|
|
+
|
|
|
+ // add request for containers and wait for containers to be allocated.
|
|
|
+ int NUM_CONTAINERS = 7;
|
|
|
+ List<Container> allocated1 = am1.allocateAndWaitForContainers("127.0.0.1",
|
|
|
+ NUM_CONTAINERS, 2 * GB, nm1);
|
|
|
+
|
|
|
+ Assert.assertEquals(7, allocated1.size());
|
|
|
+ Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory());
|
|
|
+
|
|
|
+ // check node report, 15 GB used (1 AM and 7 containers) and 1 GB available
|
|
|
+ SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
|
|
|
+ nm1.getNodeId());
|
|
|
+ Assert.assertEquals(15 * GB, report_nm1.getUsedResource().getMemory());
|
|
|
+ Assert.assertEquals(1 * GB, report_nm1.getAvailableResource().getMemory());
|
|
|
+
|
|
|
+ // Submit the second app App2 with priority 8 (Higher than App1)
|
|
|
+ Priority appPriority2 = Priority.newInstance(8);
|
|
|
+ RMApp app2 = rm.submitApp(1 * GB, appPriority2);
|
|
|
+
|
|
|
+ // kick the scheduler, 1 GB which was free is given to AM of App2
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
+ MockAM am2 = MockRM.launchAM(app2, rm, nm1);
|
|
|
+ am2.registerAppAttempt();
|
|
|
+
|
|
|
+ // check node report, 16 GB used and 0 GB available
|
|
|
+ report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
|
|
+ Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory());
|
|
|
+ Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
|
|
|
+
|
|
|
+ // get scheduler
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
|
|
+
|
|
|
+ // get scheduler app
|
|
|
+ FiCaSchedulerApp schedulerAppAttemptApp1 = cs.getSchedulerApplications()
|
|
|
+ .get(app1.getApplicationId()).getCurrentAppAttempt();
|
|
|
+ // kill 2 containers to free up some space
|
|
|
+ int counter = 0;
|
|
|
+ for (Iterator<Container> iterator = allocated1.iterator(); iterator
|
|
|
+ .hasNext();) {
|
|
|
+ Container c = iterator.next();
|
|
|
+ if (++counter > 2) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
|
|
|
+ iterator.remove();
|
|
|
+ }
|
|
|
+
|
|
|
+ // check node report, 12 GB used and 4 GB available
|
|
|
+ report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
|
|
+ Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemory());
|
|
|
+ Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemory());
|
|
|
+
|
|
|
+ // add request for containers App1
|
|
|
+ am1.allocate("127.0.0.1", 2 * GB, 10, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // add request for containers App2 and wait for containers to get allocated
|
|
|
+ List<Container> allocated2 = am2.allocateAndWaitForContainers("127.0.0.1",
|
|
|
+ 2, 2 * GB, nm1);
|
|
|
+
|
|
|
+ Assert.assertEquals(2, allocated2.size());
|
|
|
+ // check node report, 16 GB used and 0 GB available
|
|
|
+ report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
|
|
+ Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory());
|
|
|
+ Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
|
|
|
+
|
|
|
+ // kill 1 more
|
|
|
+ counter = 0;
|
|
|
+ for (Iterator<Container> iterator = allocated1.iterator(); iterator
|
|
|
+ .hasNext();) {
|
|
|
+ Container c = iterator.next();
|
|
|
+ if (++counter > 1) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
|
|
|
+ iterator.remove();
|
|
|
+ }
|
|
|
+
|
|
|
+ // check node report, 14 GB used and 2 GB available
|
|
|
+ report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
|
|
+ Assert.assertEquals(14 * GB, report_nm1.getUsedResource().getMemory());
|
|
|
+ Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemory());
|
|
|
+
|
|
|
+ // Change the priority of App1 to 3 (lowest)
|
|
|
+ Priority appPriority3 = Priority.newInstance(3);
|
|
|
+ cs.updateApplicationPriority(appPriority3, app2.getApplicationId());
|
|
|
+
|
|
|
+ // add request for containers App2
|
|
|
+ am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ // add request for containers App1 and wait for containers to get allocated
|
|
|
+ // since priority is more for App1 now, App1 will get a container.
|
|
|
+ List<Container> allocated3 = am1.allocateAndWaitForContainers("127.0.0.1",
|
|
|
+ 1, 2 * GB, nm1);
|
|
|
+
|
|
|
+ Assert.assertEquals(1, allocated3.size());
|
|
|
+ // Now App1 will have 5 containers and 1 AM. App2 will have 2 containers.
|
|
|
+ Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size());
|
|
|
+ rm.stop();
|
|
|
+ }
|
|
|
}
|