|
@@ -78,7 +78,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
@@ -255,7 +257,12 @@ public class TestFairScheduler {
|
|
|
private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
|
|
|
String queueId, String userId, int numContainers, int priority) {
|
|
|
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
|
|
- scheduler.addApplicationAttempt(id, queueId, userId);
|
|
|
+ scheduler.addApplication(id.getApplicationId(), queueId, userId);
|
|
|
+ // This conditional is for testAclSubmitApplication where app is rejected
|
|
|
+ // and no app is added.
|
|
|
+ if (scheduler.applications.containsKey(id.getApplicationId())) {
|
|
|
+ scheduler.addApplicationAttempt(id);
|
|
|
+ }
|
|
|
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
|
|
ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
|
|
|
priority, numContainers, true);
|
|
@@ -583,7 +590,7 @@ public class TestFairScheduler {
|
|
|
// Make sure queue 2 is waiting with a reservation
|
|
|
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
|
|
|
getResourceUsage().getMemory());
|
|
|
- assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory());
|
|
|
+ assertEquals(1024, scheduler.appAttempts.get(attId).getCurrentReservation().getMemory());
|
|
|
|
|
|
// Now another node checks in with capacity
|
|
|
RMNode node2 =
|
|
@@ -599,10 +606,10 @@ public class TestFairScheduler {
|
|
|
getResourceUsage().getMemory());
|
|
|
|
|
|
// The old reservation should still be there...
|
|
|
- assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory());
|
|
|
+ assertEquals(1024, scheduler.appAttempts.get(attId).getCurrentReservation().getMemory());
|
|
|
// ... but it should disappear when we update the first node.
|
|
|
scheduler.handle(updateEvent);
|
|
|
- assertEquals(0, scheduler.applications.get(attId).getCurrentReservation().getMemory());
|
|
|
+ assertEquals(0, scheduler.appAttempts.get(attId).getCurrentReservation().getMemory());
|
|
|
|
|
|
}
|
|
|
|
|
@@ -618,9 +625,13 @@ public class TestFairScheduler {
|
|
|
null, null, null, false, false, 0, null, null), null, null, 0, null);
|
|
|
appsMap.put(appAttemptId.getApplicationId(), rmApp);
|
|
|
|
|
|
- AppAttemptAddedSchedulerEvent appAddedEvent =
|
|
|
- new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user1");
|
|
|
+ AppAddedSchedulerEvent appAddedEvent =
|
|
|
+ new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default",
|
|
|
+ "user1");
|
|
|
scheduler.handle(appAddedEvent);
|
|
|
+ AppAttemptAddedSchedulerEvent attempAddedEvent =
|
|
|
+ new AppAttemptAddedSchedulerEvent(appAttemptId);
|
|
|
+ scheduler.handle(attempAddedEvent);
|
|
|
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
|
|
|
.getRunnableAppSchedulables().size());
|
|
|
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
|
|
@@ -639,10 +650,14 @@ public class TestFairScheduler {
|
|
|
null, null, null, ApplicationSubmissionContext.newInstance(null, null,
|
|
|
null, null, null, false, false, 0, null, null), null, null, 0, null);
|
|
|
appsMap.put(appAttemptId.getApplicationId(), rmApp);
|
|
|
-
|
|
|
- AppAttemptAddedSchedulerEvent appAddedEvent2 =
|
|
|
- new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user2");
|
|
|
- scheduler.handle(appAddedEvent2);
|
|
|
+
|
|
|
+ AppAddedSchedulerEvent appAddedEvent =
|
|
|
+ new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default",
|
|
|
+ "user2");
|
|
|
+ scheduler.handle(appAddedEvent);
|
|
|
+ AppAttemptAddedSchedulerEvent attempAddedEvent =
|
|
|
+ new AppAttemptAddedSchedulerEvent(appAttemptId);
|
|
|
+ scheduler.handle(attempAddedEvent);
|
|
|
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
|
|
|
.getRunnableAppSchedulables().size());
|
|
|
assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true)
|
|
@@ -660,8 +675,8 @@ public class TestFairScheduler {
|
|
|
|
|
|
// submit app with empty queue
|
|
|
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
|
|
|
- AppAttemptAddedSchedulerEvent appAddedEvent =
|
|
|
- new AppAttemptAddedSchedulerEvent(appAttemptId, "", "user1");
|
|
|
+ AppAddedSchedulerEvent appAddedEvent =
|
|
|
+ new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "", "user1");
|
|
|
scheduler.handle(appAddedEvent);
|
|
|
|
|
|
// submission rejected
|
|
@@ -695,7 +710,7 @@ public class TestFairScheduler {
|
|
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
ApplicationAttemptId appId;
|
|
|
- Map<ApplicationAttemptId, FSSchedulerApp> apps = scheduler.applications;
|
|
|
+ Map<ApplicationAttemptId, FSSchedulerApp> apps = scheduler.appAttempts;
|
|
|
|
|
|
List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
|
|
|
rules.add(new QueuePlacementRule.Specified().initialize(true, null));
|
|
@@ -786,11 +801,14 @@ public class TestFairScheduler {
|
|
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
|
|
|
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
|
|
|
- scheduler.addApplicationAttempt(id11, "root.queue1", "user1");
|
|
|
+ scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1");
|
|
|
+ scheduler.addApplicationAttempt(id11);
|
|
|
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
|
|
|
- scheduler.addApplicationAttempt(id21, "root.queue2", "user1");
|
|
|
+ scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1");
|
|
|
+ scheduler.addApplicationAttempt(id21);
|
|
|
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
|
|
|
- scheduler.addApplicationAttempt(id22, "root.queue2", "user1");
|
|
|
+ scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1");
|
|
|
+ scheduler.addApplicationAttempt(id22);
|
|
|
|
|
|
int minReqSize =
|
|
|
FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
|
|
@@ -831,11 +849,13 @@ public class TestFairScheduler {
|
|
|
@Test
|
|
|
public void testAppAdditionAndRemoval() throws Exception {
|
|
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
|
|
-
|
|
|
- AppAttemptAddedSchedulerEvent appAddedEvent1 =
|
|
|
- new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), "default",
|
|
|
- "user1");
|
|
|
- scheduler.handle(appAddedEvent1);
|
|
|
+ ApplicationAttemptId attemptId =createAppAttemptId(1, 1);
|
|
|
+ AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default",
|
|
|
+ "user1");
|
|
|
+ scheduler.handle(appAddedEvent);
|
|
|
+ AppAttemptAddedSchedulerEvent attemptAddedEvent =
|
|
|
+ new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1));
|
|
|
+ scheduler.handle(attemptAddedEvent);
|
|
|
|
|
|
// Scheduler should have two queues (the default and the one created for user1)
|
|
|
assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
|
|
@@ -1118,12 +1138,12 @@ public class TestFairScheduler {
|
|
|
scheduler.handle(nodeUpdate3);
|
|
|
}
|
|
|
|
|
|
- assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
|
|
|
- assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
|
|
|
- assertEquals(1, scheduler.applications.get(app3).getLiveContainers().size());
|
|
|
- assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
|
|
|
- assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
|
|
|
- assertEquals(1, scheduler.applications.get(app6).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(app2).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(app3).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(app5).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(app6).getLiveContainers().size());
|
|
|
|
|
|
// Now new requests arrive from queues C and D
|
|
|
ApplicationAttemptId app7 =
|
|
@@ -1146,16 +1166,16 @@ public class TestFairScheduler {
|
|
|
// Make sure it is lowest priority container.
|
|
|
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
|
|
|
Resources.createResource(2 * 1024));
|
|
|
- assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
|
|
|
- assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
|
|
|
- assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
|
|
|
- assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(app2).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(app5).getLiveContainers().size());
|
|
|
|
|
|
// First verify we are adding containers to preemption list for the application
|
|
|
- assertTrue(!Collections.disjoint(scheduler.applications.get(app3).getLiveContainers(),
|
|
|
- scheduler.applications.get(app3).getPreemptionContainers()));
|
|
|
- assertTrue(!Collections.disjoint(scheduler.applications.get(app6).getLiveContainers(),
|
|
|
- scheduler.applications.get(app6).getPreemptionContainers()));
|
|
|
+ assertTrue(!Collections.disjoint(scheduler.appAttempts.get(app3).getLiveContainers(),
|
|
|
+ scheduler.appAttempts.get(app3).getPreemptionContainers()));
|
|
|
+ assertTrue(!Collections.disjoint(scheduler.appAttempts.get(app6).getLiveContainers(),
|
|
|
+ scheduler.appAttempts.get(app6).getPreemptionContainers()));
|
|
|
|
|
|
// Pretend 15 seconds have passed
|
|
|
clock.tick(15);
|
|
@@ -1165,8 +1185,8 @@ public class TestFairScheduler {
|
|
|
Resources.createResource(2 * 1024));
|
|
|
|
|
|
// At this point the containers should have been killed (since we are not simulating AM)
|
|
|
- assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
|
|
|
- assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
|
|
|
+ assertEquals(0, scheduler.appAttempts.get(app6).getLiveContainers().size());
|
|
|
+ assertEquals(0, scheduler.appAttempts.get(app3).getLiveContainers().size());
|
|
|
|
|
|
// Trigger a kill by insisting we want containers back
|
|
|
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
|
|
@@ -1180,22 +1200,22 @@ public class TestFairScheduler {
|
|
|
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
|
|
|
Resources.createResource(2 * 1024));
|
|
|
|
|
|
- assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
|
|
|
- assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
|
|
|
- assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
|
|
|
- assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
|
|
|
- assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size());
|
|
|
- assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size());
|
|
|
+ assertEquals(0, scheduler.appAttempts.get(app2).getLiveContainers().size());
|
|
|
+ assertEquals(0, scheduler.appAttempts.get(app3).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size());
|
|
|
+ assertEquals(0, scheduler.appAttempts.get(app5).getLiveContainers().size());
|
|
|
+ assertEquals(0, scheduler.appAttempts.get(app6).getLiveContainers().size());
|
|
|
|
|
|
// Now A and B are below fair share, so preemption shouldn't do anything
|
|
|
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
|
|
|
Resources.createResource(2 * 1024));
|
|
|
- assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
|
|
|
- assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
|
|
|
- assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
|
|
|
- assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
|
|
|
- assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size());
|
|
|
- assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(app1).getLiveContainers().size());
|
|
|
+ assertEquals(0, scheduler.appAttempts.get(app2).getLiveContainers().size());
|
|
|
+ assertEquals(0, scheduler.appAttempts.get(app3).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(app4).getLiveContainers().size());
|
|
|
+ assertEquals(0, scheduler.appAttempts.get(app5).getLiveContainers().size());
|
|
|
+ assertEquals(0, scheduler.appAttempts.get(app6).getLiveContainers().size());
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 5000)
|
|
@@ -1354,9 +1374,9 @@ public class TestFairScheduler {
|
|
|
|
|
|
// One container should get reservation and the other should get nothing
|
|
|
assertEquals(1024,
|
|
|
- scheduler.applications.get(attId1).getCurrentReservation().getMemory());
|
|
|
+ scheduler.appAttempts.get(attId1).getCurrentReservation().getMemory());
|
|
|
assertEquals(0,
|
|
|
- scheduler.applications.get(attId2).getCurrentReservation().getMemory());
|
|
|
+ scheduler.appAttempts.get(attId2).getCurrentReservation().getMemory());
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 5000)
|
|
@@ -1391,7 +1411,7 @@ public class TestFairScheduler {
|
|
|
scheduler.handle(updateEvent);
|
|
|
|
|
|
// App 1 should be running
|
|
|
- assertEquals(1, scheduler.applications.get(attId1).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(attId1).getLiveContainers().size());
|
|
|
|
|
|
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
|
|
|
"user1", 1);
|
|
@@ -1400,7 +1420,7 @@ public class TestFairScheduler {
|
|
|
scheduler.handle(updateEvent);
|
|
|
|
|
|
// App 2 should not be running
|
|
|
- assertEquals(0, scheduler.applications.get(attId2).getLiveContainers().size());
|
|
|
+ assertEquals(0, scheduler.appAttempts.get(attId2).getLiveContainers().size());
|
|
|
|
|
|
// Request another container for app 1
|
|
|
createSchedulingRequestExistingApplication(1024, 1, attId1);
|
|
@@ -1409,7 +1429,7 @@ public class TestFairScheduler {
|
|
|
scheduler.handle(updateEvent);
|
|
|
|
|
|
// Request should be fulfilled
|
|
|
- assertEquals(2, scheduler.applications.get(attId1).getLiveContainers().size());
|
|
|
+ assertEquals(2, scheduler.appAttempts.get(attId1).getLiveContainers().size());
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 5000)
|
|
@@ -1429,10 +1449,10 @@ public class TestFairScheduler {
|
|
|
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
|
|
|
scheduler.handle(updateEvent);
|
|
|
|
|
|
- FSSchedulerApp app = scheduler.applications.get(attId);
|
|
|
+ FSSchedulerApp app = scheduler.appAttempts.get(attId);
|
|
|
assertEquals(1, app.getLiveContainers().size());
|
|
|
|
|
|
- ContainerId containerId = scheduler.applications.get(attId)
|
|
|
+ ContainerId containerId = scheduler.appAttempts.get(attId)
|
|
|
.getLiveContainers().iterator().next().getContainerId();
|
|
|
|
|
|
// Cause reservation to be created
|
|
@@ -1501,9 +1521,9 @@ public class TestFairScheduler {
|
|
|
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
|
|
|
"norealuserhasthisname2", 1);
|
|
|
|
|
|
- FSSchedulerApp app1 = scheduler.applications.get(attId1);
|
|
|
+ FSSchedulerApp app1 = scheduler.appAttempts.get(attId1);
|
|
|
assertNotNull("The application was not allowed", app1);
|
|
|
- FSSchedulerApp app2 = scheduler.applications.get(attId2);
|
|
|
+ FSSchedulerApp app2 = scheduler.appAttempts.get(attId2);
|
|
|
assertNull("The application was allowed", app2);
|
|
|
}
|
|
|
|
|
@@ -1526,7 +1546,8 @@ public class TestFairScheduler {
|
|
|
scheduler.handle(nodeEvent2);
|
|
|
|
|
|
ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
|
|
- scheduler.addApplicationAttempt(appId, "queue1", "user1");
|
|
|
+ scheduler.addApplication(appId.getApplicationId(), "queue1", "user1");
|
|
|
+ scheduler.addApplicationAttempt(appId);
|
|
|
|
|
|
// 1 request with 2 nodes on the same rack. another request with 1 node on
|
|
|
// a different rack
|
|
@@ -1545,14 +1566,14 @@ public class TestFairScheduler {
|
|
|
NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1);
|
|
|
scheduler.handle(updateEvent1);
|
|
|
// should assign node local
|
|
|
- assertEquals(1, scheduler.applications.get(appId).getLiveContainers().size());
|
|
|
+ assertEquals(1, scheduler.appAttempts.get(appId).getLiveContainers().size());
|
|
|
|
|
|
// node 2 checks in
|
|
|
scheduler.update();
|
|
|
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
|
|
|
scheduler.handle(updateEvent2);
|
|
|
// should assign rack local
|
|
|
- assertEquals(2, scheduler.applications.get(appId).getLiveContainers().size());
|
|
|
+ assertEquals(2, scheduler.appAttempts.get(appId).getLiveContainers().size());
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 5000)
|
|
@@ -1571,8 +1592,8 @@ public class TestFairScheduler {
|
|
|
"user1", 2);
|
|
|
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
|
|
|
"user1", 2);
|
|
|
- FSSchedulerApp app1 = scheduler.applications.get(attId1);
|
|
|
- FSSchedulerApp app2 = scheduler.applications.get(attId2);
|
|
|
+ FSSchedulerApp app1 = scheduler.appAttempts.get(attId1);
|
|
|
+ FSSchedulerApp app2 = scheduler.appAttempts.get(attId2);
|
|
|
|
|
|
FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true);
|
|
|
queue1.setPolicy(new FifoPolicy());
|
|
@@ -1612,7 +1633,7 @@ public class TestFairScheduler {
|
|
|
|
|
|
ApplicationAttemptId attId =
|
|
|
createSchedulingRequest(1024, "root.default", "user", 8);
|
|
|
- FSSchedulerApp app = scheduler.applications.get(attId);
|
|
|
+ FSSchedulerApp app = scheduler.appAttempts.get(attId);
|
|
|
|
|
|
// set maxAssign to 2: only 2 containers should be allocated
|
|
|
scheduler.maxAssign = 2;
|
|
@@ -1674,10 +1695,10 @@ public class TestFairScheduler {
|
|
|
ApplicationAttemptId attId4 =
|
|
|
createSchedulingRequest(1024, fifoQueue, user, 4);
|
|
|
|
|
|
- FSSchedulerApp app1 = scheduler.applications.get(attId1);
|
|
|
- FSSchedulerApp app2 = scheduler.applications.get(attId2);
|
|
|
- FSSchedulerApp app3 = scheduler.applications.get(attId3);
|
|
|
- FSSchedulerApp app4 = scheduler.applications.get(attId4);
|
|
|
+ FSSchedulerApp app1 = scheduler.appAttempts.get(attId1);
|
|
|
+ FSSchedulerApp app2 = scheduler.appAttempts.get(attId2);
|
|
|
+ FSSchedulerApp app3 = scheduler.appAttempts.get(attId3);
|
|
|
+ FSSchedulerApp app4 = scheduler.appAttempts.get(attId4);
|
|
|
|
|
|
scheduler.getQueueManager().getLeafQueue(fifoQueue, true)
|
|
|
.setPolicy(SchedulingPolicy.parse("fifo"));
|
|
@@ -1764,7 +1785,7 @@ public class TestFairScheduler {
|
|
|
|
|
|
ApplicationAttemptId attId =
|
|
|
ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++);
|
|
|
- scheduler.addApplicationAttempt(attId, queue, user);
|
|
|
+ scheduler.addApplication(attId.getApplicationId(), queue, user);
|
|
|
|
|
|
numTries = 0;
|
|
|
while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
|
|
@@ -1792,7 +1813,7 @@ public class TestFairScheduler {
|
|
|
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
|
|
|
scheduler.handle(updateEvent);
|
|
|
|
|
|
- FSSchedulerApp app = scheduler.applications.get(attId);
|
|
|
+ FSSchedulerApp app = scheduler.appAttempts.get(attId);
|
|
|
assertEquals(0, app.getLiveContainers().size());
|
|
|
assertEquals(0, app.getReservedContainers().size());
|
|
|
|
|
@@ -1861,7 +1882,7 @@ public class TestFairScheduler {
|
|
|
NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2);
|
|
|
|
|
|
// no matter how many heartbeats, node2 should never get a container
|
|
|
- FSSchedulerApp app = scheduler.applications.get(attId1);
|
|
|
+ FSSchedulerApp app = scheduler.appAttempts.get(attId1);
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
scheduler.handle(node2UpdateEvent);
|
|
|
assertEquals(0, app.getLiveContainers().size());
|
|
@@ -1900,7 +1921,7 @@ public class TestFairScheduler {
|
|
|
NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2);
|
|
|
|
|
|
// no matter how many heartbeats, node2 should never get a container
|
|
|
- FSSchedulerApp app = scheduler.applications.get(attId1);
|
|
|
+ FSSchedulerApp app = scheduler.appAttempts.get(attId1);
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
scheduler.handle(node2UpdateEvent);
|
|
|
assertEquals(0, app.getLiveContainers().size());
|
|
@@ -1933,7 +1954,7 @@ public class TestFairScheduler {
|
|
|
|
|
|
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
|
|
|
"user1", 0);
|
|
|
- FSSchedulerApp app = scheduler.applications.get(attId);
|
|
|
+ FSSchedulerApp app = scheduler.appAttempts.get(attId);
|
|
|
|
|
|
ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true);
|
|
|
ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true);
|
|
@@ -1973,7 +1994,7 @@ public class TestFairScheduler {
|
|
|
|
|
|
ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default",
|
|
|
"user1", 2);
|
|
|
- FSSchedulerApp app = scheduler.applications.get(attId);
|
|
|
+ FSSchedulerApp app = scheduler.appAttempts.get(attId);
|
|
|
scheduler.update();
|
|
|
|
|
|
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
|
|
@@ -1993,10 +2014,10 @@ public class TestFairScheduler {
|
|
|
|
|
|
ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1",
|
|
|
"user1", 2);
|
|
|
- FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
|
|
|
+ FSSchedulerApp app1 = scheduler.appAttempts.get(appAttId1);
|
|
|
ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1",
|
|
|
"user1", 2);
|
|
|
- FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
|
|
|
+ FSSchedulerApp app2 = scheduler.appAttempts.get(appAttId2);
|
|
|
|
|
|
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
|
|
|
drfPolicy.initialize(scheduler.getClusterCapacity());
|
|
@@ -2034,13 +2055,13 @@ public class TestFairScheduler {
|
|
|
|
|
|
ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1",
|
|
|
"user1", 2);
|
|
|
- FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
|
|
|
+ FSSchedulerApp app1 = scheduler.appAttempts.get(appAttId1);
|
|
|
ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1",
|
|
|
"user1", 2);
|
|
|
- FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
|
|
|
+ FSSchedulerApp app2 = scheduler.appAttempts.get(appAttId2);
|
|
|
ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2",
|
|
|
"user1", 2);
|
|
|
- FSSchedulerApp app3 = scheduler.applications.get(appAttId3);
|
|
|
+ FSSchedulerApp app3 = scheduler.appAttempts.get(appAttId3);
|
|
|
|
|
|
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
|
|
|
drfPolicy.initialize(scheduler.getClusterCapacity());
|
|
@@ -2071,19 +2092,19 @@ public class TestFairScheduler {
|
|
|
ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1",
|
|
|
"user1", 2);
|
|
|
Thread.sleep(3); // so that start times will be different
|
|
|
- FSSchedulerApp app1 = scheduler.applications.get(appAttId1);
|
|
|
+ FSSchedulerApp app1 = scheduler.appAttempts.get(appAttId1);
|
|
|
ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1",
|
|
|
"user1", 2);
|
|
|
Thread.sleep(3); // so that start times will be different
|
|
|
- FSSchedulerApp app2 = scheduler.applications.get(appAttId2);
|
|
|
+ FSSchedulerApp app2 = scheduler.appAttempts.get(appAttId2);
|
|
|
ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2",
|
|
|
"user1", 2);
|
|
|
Thread.sleep(3); // so that start times will be different
|
|
|
- FSSchedulerApp app3 = scheduler.applications.get(appAttId3);
|
|
|
+ FSSchedulerApp app3 = scheduler.appAttempts.get(appAttId3);
|
|
|
ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2",
|
|
|
"user1", 2);
|
|
|
Thread.sleep(3); // so that start times will be different
|
|
|
- FSSchedulerApp app4 = scheduler.applications.get(appAttId4);
|
|
|
+ FSSchedulerApp app4 = scheduler.appAttempts.get(appAttId4);
|
|
|
|
|
|
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
|
|
|
drfPolicy.initialize(scheduler.getClusterCapacity());
|
|
@@ -2163,7 +2184,7 @@ public class TestFairScheduler {
|
|
|
NodeUpdateSchedulerEvent(node2);
|
|
|
|
|
|
// no matter how many heartbeats, node2 should never get a container
|
|
|
- FSSchedulerApp app = scheduler.applications.get(attId1);
|
|
|
+ FSSchedulerApp app = scheduler.appAttempts.get(attId1);
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
scheduler.handle(node2UpdateEvent);
|
|
|
assertEquals(0, app.getLiveContainers().size());
|
|
@@ -2178,12 +2199,12 @@ public class TestFairScheduler {
|
|
|
public void testConcurrentAccessOnApplications() throws Exception {
|
|
|
FairScheduler fs = new FairScheduler();
|
|
|
TestCapacityScheduler.verifyConcurrentAccessOnApplications(
|
|
|
- fs.applications, FSSchedulerApp.class, FSLeafQueue.class);
|
|
|
+ fs.appAttempts, FSSchedulerApp.class, FSLeafQueue.class);
|
|
|
}
|
|
|
|
|
|
|
|
|
private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) {
|
|
|
- FSSchedulerApp app = scheduler.applications.get(attId);
|
|
|
+ FSSchedulerApp app = scheduler.appAttempts.get(attId);
|
|
|
FSLeafQueue queue = app.getQueue();
|
|
|
Collection<AppSchedulable> runnableApps =
|
|
|
queue.getRunnableAppSchedulables();
|
|
@@ -2356,7 +2377,8 @@ public class TestFairScheduler {
|
|
|
// send application request
|
|
|
ApplicationAttemptId appAttemptId =
|
|
|
createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
|
|
|
- fs.addApplicationAttempt(appAttemptId, "queue11", "user11");
|
|
|
+ fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11");
|
|
|
+ fs.addApplicationAttempt(appAttemptId);
|
|
|
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
|
|
ResourceRequest request =
|
|
|
createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
|
|
@@ -2367,7 +2389,7 @@ public class TestFairScheduler {
|
|
|
// at least one pass
|
|
|
Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500);
|
|
|
|
|
|
- FSSchedulerApp app = fs.applications.get(appAttemptId);
|
|
|
+ FSSchedulerApp app = fs.appAttempts.get(appAttemptId);
|
|
|
// Wait until app gets resources.
|
|
|
while (app.getCurrentConsumption().equals(Resources.none())) { }
|
|
|
|
|
@@ -2455,7 +2477,7 @@ public class TestFairScheduler {
|
|
|
|
|
|
ApplicationAttemptId appAttemptId =
|
|
|
createSchedulingRequest(GB, "root.default", "user", 1);
|
|
|
- FSSchedulerApp app = scheduler.applications.get(appAttemptId);
|
|
|
+ FSSchedulerApp app = scheduler.appAttempts.get(appAttemptId);
|
|
|
|
|
|
// Verify the blacklist can be updated independent of requesting containers
|
|
|
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
|
|
@@ -2465,7 +2487,7 @@ public class TestFairScheduler {
|
|
|
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
|
|
|
Collections.<ContainerId>emptyList(), null,
|
|
|
Collections.singletonList(host));
|
|
|
- assertFalse(scheduler.applications.get(appAttemptId).isBlacklisted(host));
|
|
|
+ assertFalse(scheduler.appAttempts.get(appAttemptId).isBlacklisted(host));
|
|
|
|
|
|
List<ResourceRequest> update = Arrays.asList(
|
|
|
createResourceRequest(GB, node.getHostName(), 1, 0, true));
|
|
@@ -2527,4 +2549,12 @@ public class TestFairScheduler {
|
|
|
assertTrue(appAttIds.contains(appAttId1));
|
|
|
assertTrue(appAttIds.contains(appAttId2));
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testAddAndRemoveAppFromFairScheduler() throws Exception {
|
|
|
+ FairScheduler scheduler =
|
|
|
+ (FairScheduler) resourceManager.getResourceScheduler();
|
|
|
+ TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
|
|
|
+ scheduler.applications, scheduler, "default");
|
|
|
+ }
|
|
|
}
|