|
@@ -5151,4 +5151,64 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
assertEquals("root.user1", resourceManager.getRMContext().getRMApps()
|
|
|
.get(attId3.getApplicationId()).getQueue());
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testFairSchedulerContinuousSchedulingInitTime() throws Exception {
|
|
|
+ int DELAY_THRESHOLD_TIME_MS = 1000;
|
|
|
+ conf.set(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, "true");
|
|
|
+ conf.set(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS,
|
|
|
+ String.valueOf(DELAY_THRESHOLD_TIME_MS));
|
|
|
+ conf.set(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS,
|
|
|
+ String.valueOf(DELAY_THRESHOLD_TIME_MS));
|
|
|
+
|
|
|
+ ControlledClock clock = new ControlledClock();
|
|
|
+ scheduler.setClock(clock);
|
|
|
+ scheduler.init(conf);
|
|
|
+ scheduler.start();
|
|
|
+
|
|
|
+ int priorityValue;
|
|
|
+ Priority priority;
|
|
|
+ FSAppAttempt fsAppAttempt;
|
|
|
+ ResourceRequest request1;
|
|
|
+ ResourceRequest request2;
|
|
|
+ ApplicationAttemptId id11;
|
|
|
+
|
|
|
+ priorityValue = 1;
|
|
|
+ id11 = createAppAttemptId(1, 1);
|
|
|
+ createMockRMApp(id11);
|
|
|
+ priority = Priority.newInstance(priorityValue);
|
|
|
+ scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
|
|
|
+ false);
|
|
|
+ scheduler.addApplicationAttempt(id11, false, false);
|
|
|
+ fsAppAttempt = scheduler.getApplicationAttempt(id11);
|
|
|
+
|
|
|
+ String hostName = "127.0.0.1";
|
|
|
+ RMNode node1 =
|
|
|
+ MockNodes.newNodeInfo(1, Resources.createResource(16 * 1024, 16), 1,
|
|
|
+ hostName);
|
|
|
+ List<ResourceRequest> ask1 = new ArrayList<>();
|
|
|
+ request1 =
|
|
|
+ createResourceRequest(1024, 8, node1.getRackName(), priorityValue, 1,
|
|
|
+ true);
|
|
|
+ request2 =
|
|
|
+ createResourceRequest(1024, 8, ResourceRequest.ANY, priorityValue, 1,
|
|
|
+ true);
|
|
|
+ ask1.add(request1);
|
|
|
+ ask1.add(request2);
|
|
|
+ scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null,
|
|
|
+ null, null);
|
|
|
+
|
|
|
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
|
+ scheduler.handle(nodeEvent1);
|
|
|
+ FSSchedulerNode node =
|
|
|
+ (FSSchedulerNode) scheduler.getSchedulerNode(node1.getNodeID());
|
|
|
+ // Tick the time and let the fsApp startTime different from initScheduler
|
|
|
+ // time
|
|
|
+ clock.tickSec(DELAY_THRESHOLD_TIME_MS / 1000);
|
|
|
+ scheduler.attemptScheduling(node);
|
|
|
+ Map<Priority, Long> lastScheduledContainer =
|
|
|
+ fsAppAttempt.getLastScheduledContainer();
|
|
|
+ long initSchedulerTime = lastScheduledContainer.get(priority);
|
|
|
+ assertEquals(DELAY_THRESHOLD_TIME_MS, initSchedulerTime);
|
|
|
+ }
|
|
|
}
|