|
@@ -72,7 +72,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
@@ -100,6 +99,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlaceme
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
+import org.apache.hadoop.yarn.util.ControlledClock;
|
|
|
+import org.apache.hadoop.yarn.util.SystemClock;
|
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
@@ -1489,7 +1490,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
|
|
|
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
|
|
|
|
|
|
- MockClock clock = new MockClock();
|
|
|
+ ControlledClock clock = new ControlledClock();
|
|
|
scheduler.setClock(clock);
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
@@ -1587,7 +1588,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
scheduler.getSchedulerApp(app2).getPreemptionContainers()));
|
|
|
|
|
|
// Pretend 15 seconds have passed
|
|
|
- clock.tick(15);
|
|
|
+ clock.tickSec(15);
|
|
|
|
|
|
// Trigger a kill by insisting we want containers back
|
|
|
scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
@@ -1617,7 +1618,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
scheduler.preemptResources(Resources.createResource(2 * 1024));
|
|
|
|
|
|
// Pretend 15 seconds have passed
|
|
|
- clock.tick(15);
|
|
|
+ clock.tickSec(15);
|
|
|
|
|
|
// We should be able to claw back another container from A and B each.
|
|
|
// For queueA (fifo), continue preempting from app2.
|
|
@@ -1649,7 +1650,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
|
|
|
|
|
|
- MockClock clock = new MockClock();
|
|
|
+ ControlledClock clock = new ControlledClock();
|
|
|
scheduler.setClock(clock);
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
@@ -1702,7 +1703,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
scheduler.update();
|
|
|
|
|
|
// Let 11 sec pass
|
|
|
- clock.tick(11);
|
|
|
+ clock.tickSec(11);
|
|
|
|
|
|
scheduler.update();
|
|
|
Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager()
|
|
@@ -1722,7 +1723,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
*/
|
|
|
public void testPreemptionDecision() throws Exception {
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
- MockClock clock = new MockClock();
|
|
|
+ ControlledClock clock = new ControlledClock();
|
|
|
scheduler.setClock(clock);
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
@@ -1833,7 +1834,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
Resources.none(), scheduler.resToPreempt(schedD, clock.getTime())));
|
|
|
// After minSharePreemptionTime has passed, they should want to preempt min
|
|
|
// share.
|
|
|
- clock.tick(6);
|
|
|
+ clock.tickSec(6);
|
|
|
assertEquals(
|
|
|
1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
|
|
|
assertEquals(
|
|
@@ -1842,7 +1843,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
// After fairSharePreemptionTime has passed, they should want to preempt
|
|
|
// fair share.
|
|
|
scheduler.update();
|
|
|
- clock.tick(6);
|
|
|
+ clock.tickSec(6);
|
|
|
assertEquals(
|
|
|
1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
|
|
|
assertEquals(
|
|
@@ -1855,7 +1856,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
*/
|
|
|
public void testPreemptionDecisionWithVariousTimeout() throws Exception {
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
- MockClock clock = new MockClock();
|
|
|
+ ControlledClock clock = new ControlledClock();
|
|
|
scheduler.setClock(clock);
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
@@ -1971,7 +1972,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
|
|
|
// After 5 seconds, queueB1 wants to preempt min share
|
|
|
scheduler.update();
|
|
|
- clock.tick(6);
|
|
|
+ clock.tickSec(6);
|
|
|
assertEquals(
|
|
|
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
|
|
|
assertEquals(
|
|
@@ -1981,7 +1982,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
|
|
|
// After 10 seconds, queueB2 wants to preempt min share
|
|
|
scheduler.update();
|
|
|
- clock.tick(5);
|
|
|
+ clock.tickSec(5);
|
|
|
assertEquals(
|
|
|
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
|
|
|
assertEquals(
|
|
@@ -1991,7 +1992,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
|
|
|
// After 15 seconds, queueC wants to preempt min share
|
|
|
scheduler.update();
|
|
|
- clock.tick(5);
|
|
|
+ clock.tickSec(5);
|
|
|
assertEquals(
|
|
|
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
|
|
|
assertEquals(
|
|
@@ -2001,7 +2002,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
|
|
|
// After 20 seconds, queueB2 should want to preempt fair share
|
|
|
scheduler.update();
|
|
|
- clock.tick(5);
|
|
|
+ clock.tickSec(5);
|
|
|
assertEquals(
|
|
|
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
|
|
|
assertEquals(
|
|
@@ -2011,7 +2012,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
|
|
|
// After 25 seconds, queueB1 should want to preempt fair share
|
|
|
scheduler.update();
|
|
|
- clock.tick(5);
|
|
|
+ clock.tickSec(5);
|
|
|
assertEquals(
|
|
|
1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
|
|
|
assertEquals(
|
|
@@ -2021,7 +2022,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
|
|
|
// After 30 seconds, queueC should want to preempt fair share
|
|
|
scheduler.update();
|
|
|
- clock.tick(5);
|
|
|
+ clock.tickSec(5);
|
|
|
assertEquals(
|
|
|
1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
|
|
|
assertEquals(
|
|
@@ -3703,7 +3704,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
@Test
|
|
|
public void testMaxRunningAppsHierarchicalQueues() throws Exception {
|
|
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
|
|
- MockClock clock = new MockClock();
|
|
|
+ ControlledClock clock = new ControlledClock();
|
|
|
scheduler.setClock(clock);
|
|
|
|
|
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
|
@@ -3728,28 +3729,28 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1");
|
|
|
verifyAppRunnable(attId1, true);
|
|
|
verifyQueueNumRunnable("queue1.sub1", 1, 0);
|
|
|
- clock.tick(10);
|
|
|
+ clock.tickSec(10);
|
|
|
// exceeds no limits
|
|
|
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1.sub3", "user1");
|
|
|
verifyAppRunnable(attId2, true);
|
|
|
verifyQueueNumRunnable("queue1.sub3", 1, 0);
|
|
|
- clock.tick(10);
|
|
|
+ clock.tickSec(10);
|
|
|
// exceeds no limits
|
|
|
ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1.sub2", "user1");
|
|
|
verifyAppRunnable(attId3, true);
|
|
|
verifyQueueNumRunnable("queue1.sub2", 1, 0);
|
|
|
- clock.tick(10);
|
|
|
+ clock.tickSec(10);
|
|
|
// exceeds queue1 limit
|
|
|
ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1.sub2", "user1");
|
|
|
verifyAppRunnable(attId4, false);
|
|
|
verifyQueueNumRunnable("queue1.sub2", 1, 1);
|
|
|
- clock.tick(10);
|
|
|
+ clock.tickSec(10);
|
|
|
// exceeds sub3 limit
|
|
|
ApplicationAttemptId attId5 = createSchedulingRequest(1024, "queue1.sub3", "user1");
|
|
|
verifyAppRunnable(attId5, false);
|
|
|
verifyQueueNumRunnable("queue1.sub3", 1, 1);
|
|
|
- clock.tick(10);
|
|
|
-
|
|
|
+ clock.tickSec(10);
|
|
|
+
|
|
|
// Even though the app was removed from sub3, the app from sub2 gets to go
|
|
|
// because it came in first
|
|
|
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
|
|
@@ -3923,7 +3924,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
public void testRecoverRequestAfterPreemption() throws Exception {
|
|
|
conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
|
|
|
|
|
|
- MockClock clock = new MockClock();
|
|
|
+ ControlledClock clock = new ControlledClock();
|
|
|
scheduler.setClock(clock);
|
|
|
scheduler.init(conf);
|
|
|
scheduler.start();
|
|
@@ -3974,8 +3975,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
scheduler.warnOrKillContainer(rmContainer);
|
|
|
|
|
|
// Wait for few clock ticks
|
|
|
- clock.tick(5);
|
|
|
-
|
|
|
+ clock.tickSec(5);
|
|
|
+
|
|
|
// preempt now
|
|
|
scheduler.warnOrKillContainer(rmContainer);
|
|
|
|