|
@@ -97,23 +97,24 @@ import com.google.common.base.Supplier;
|
|
|
|
|
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
|
|
@RunWith(value = Parameterized.class)
|
|
|
-public class TestWorkPreservingRMRestart {
|
|
|
+public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
private YarnConfiguration conf;
|
|
|
- private Class<?> schedulerClass;
|
|
|
MockRM rm1 = null;
|
|
|
MockRM rm2 = null;
|
|
|
|
|
|
+ public TestWorkPreservingRMRestart(SchedulerType type) {
|
|
|
+ super(type);
|
|
|
+ }
|
|
|
+
|
|
|
@Before
|
|
|
public void setup() throws UnknownHostException {
|
|
|
Logger rootLogger = LogManager.getRootLogger();
|
|
|
rootLogger.setLevel(Level.DEBUG);
|
|
|
- conf = new YarnConfiguration();
|
|
|
+ conf = getConf();
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
|
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
|
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
|
|
- conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerClass,
|
|
|
- ResourceScheduler.class);
|
|
|
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
|
|
|
conf.setLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0);
|
|
|
DefaultMetricsSystem.setMiniClusterMode(true);
|
|
@@ -129,16 +130,6 @@ public class TestWorkPreservingRMRestart {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Parameterized.Parameters
|
|
|
- public static Collection<Object[]> getTestParameters() {
|
|
|
- return Arrays.asList(new Object[][] { { CapacityScheduler.class },
|
|
|
- { FifoScheduler.class }, {FairScheduler.class } });
|
|
|
- }
|
|
|
-
|
|
|
- public TestWorkPreservingRMRestart(Class<?> schedulerClass) {
|
|
|
- this.schedulerClass = schedulerClass;
|
|
|
- }
|
|
|
-
|
|
|
// Test common scheduler state including SchedulerAttempt, SchedulerNode,
|
|
|
// AppSchedulingInfo can be reconstructed via the container recovery reports
|
|
|
// on NM re-registration.
|
|
@@ -159,9 +150,6 @@ public class TestWorkPreservingRMRestart {
|
|
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
memStore.init(conf);
|
|
|
rm1 = new MockRM(conf, memStore);
|
|
|
- if (schedulerClass.equals(FairScheduler.class)) {
|
|
|
- initFairScheduler(rm1);
|
|
|
- }
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
|
@@ -174,9 +162,6 @@ public class TestWorkPreservingRMRestart {
|
|
|
|
|
|
// Re-start RM
|
|
|
rm2 = new MockRM(conf, memStore);
|
|
|
- if (schedulerClass.equals(FairScheduler.class)) {
|
|
|
- initFairScheduler(rm2);
|
|
|
- }
|
|
|
rm2.start();
|
|
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
|
|
// recover app
|
|
@@ -249,11 +234,9 @@ public class TestWorkPreservingRMRestart {
|
|
|
SchedulerApplication schedulerApp =
|
|
|
schedulerApps.get(recoveredApp1.getApplicationId());
|
|
|
|
|
|
- if (schedulerClass.equals(CapacityScheduler.class)) {
|
|
|
+ if (getSchedulerType() == SchedulerType.CAPACITY) {
|
|
|
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
|
|
|
- } else if (schedulerClass.equals(FifoScheduler.class)) {
|
|
|
- checkFifoQueue(rm2, schedulerApp, usedResources, availableResources);
|
|
|
- } else if (schedulerClass.equals(FairScheduler.class)) {
|
|
|
+ } else {
|
|
|
checkFSQueue(rm2, schedulerApp, usedResources, availableResources);
|
|
|
}
|
|
|
|
|
@@ -324,25 +307,6 @@ public class TestWorkPreservingRMRestart {
|
|
|
.getUsed());
|
|
|
}
|
|
|
|
|
|
- private void checkFifoQueue(ResourceManager rm,
|
|
|
- SchedulerApplication schedulerApp, Resource usedResources,
|
|
|
- Resource availableResources) throws Exception {
|
|
|
- FifoScheduler scheduler = (FifoScheduler) rm.getResourceScheduler();
|
|
|
- // ************ check cluster used Resources ********
|
|
|
- assertEquals(usedResources, scheduler.getUsedResource());
|
|
|
-
|
|
|
- // ************ check app headroom ****************
|
|
|
- SchedulerApplicationAttempt schedulerAttempt =
|
|
|
- schedulerApp.getCurrentAppAttempt();
|
|
|
- assertEquals(availableResources, schedulerAttempt.getHeadroom());
|
|
|
-
|
|
|
- // ************ check queue metrics ****************
|
|
|
- QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
|
|
|
- assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
|
|
- availableResources.getVirtualCores(), usedResources.getMemory(),
|
|
|
- usedResources.getVirtualCores());
|
|
|
- }
|
|
|
-
|
|
|
private void checkFSQueue(ResourceManager rm,
|
|
|
SchedulerApplication schedulerApp, Resource usedResources,
|
|
|
Resource availableResources) throws Exception {
|
|
@@ -379,29 +343,6 @@ public class TestWorkPreservingRMRestart {
|
|
|
usedResources.getVirtualCores());
|
|
|
}
|
|
|
|
|
|
- private void initFairScheduler(ResourceManager rm) throws IOException {
|
|
|
- FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
|
|
|
- String testDir =
|
|
|
- new File(
|
|
|
- System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
|
|
|
- String allocFile = new File(testDir, "test-queues").getAbsolutePath();
|
|
|
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
|
|
|
-
|
|
|
- PrintWriter out = new PrintWriter(new FileWriter(allocFile));
|
|
|
- out.println("<?xml version=\"1.0\"?>");
|
|
|
- out.println("<allocations>");
|
|
|
- out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
|
|
|
- out.println("<queue name=\"root\">");
|
|
|
- out.println(" <schedulingPolicy>drf</schedulingPolicy>");
|
|
|
- out.println(" <weight>1.0</weight>");
|
|
|
- out.println(" <fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>");
|
|
|
- out.println(" <minSharePreemptionTimeout>120</minSharePreemptionTimeout>");
|
|
|
- out.println(" <fairSharePreemptionThreshold>.5</fairSharePreemptionThreshold>");
|
|
|
- out.println("</queue>");
|
|
|
- out.println("</allocations>");
|
|
|
- out.close();
|
|
|
- }
|
|
|
-
|
|
|
// create 3 container reports for AM
|
|
|
public static List<NMContainerStatus>
|
|
|
createNMContainerStatusForApp(MockAM am) {
|
|
@@ -468,7 +409,7 @@ public class TestWorkPreservingRMRestart {
|
|
|
// 10. Assert each user's consumption inside the queue.
|
|
|
@Test (timeout = 30000)
|
|
|
public void testCapacitySchedulerRecovery() throws Exception {
|
|
|
- if (!schedulerClass.equals(CapacityScheduler.class)) {
|
|
|
+ if (getSchedulerType() != SchedulerType.CAPACITY) {
|
|
|
return;
|
|
|
}
|
|
|
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|
|
@@ -587,7 +528,7 @@ public class TestWorkPreservingRMRestart {
|
|
|
//3. Verify that the expected exception was thrown
|
|
|
@Test (timeout = 30000, expected = QueueNotFoundException.class)
|
|
|
public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
|
|
|
- if (!schedulerClass.equals(CapacityScheduler.class)) {
|
|
|
+ if (getSchedulerType() != SchedulerType.CAPACITY) {
|
|
|
throw new QueueNotFoundException("Dummy");
|
|
|
}
|
|
|
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|