|
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Arrays;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
@@ -50,6 +51,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
|
|
import org.apache.hadoop.yarn.util.Times;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.apache.log4j.LogManager;
|
|
@@ -67,6 +71,9 @@ public class TestApplicationLifetimeMonitor {
|
|
|
@Before
|
|
|
public void setup() throws IOException {
|
|
|
conf = new YarnConfiguration();
|
|
|
+ // Always run for CS, since other scheduler do not support this.
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER,
|
|
|
+ CapacityScheduler.class, ResourceScheduler.class);
|
|
|
Logger rootLogger = LogManager.getRootLogger();
|
|
|
rootLogger.setLevel(Level.DEBUG);
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
@@ -78,8 +85,15 @@ public class TestApplicationLifetimeMonitor {
|
|
|
public void testApplicationLifetimeMonitor() throws Exception {
|
|
|
MockRM rm = null;
|
|
|
try {
|
|
|
+ long maxLifetime = 30L;
|
|
|
+ long defaultLifetime = 15L;
|
|
|
+
|
|
|
+ YarnConfiguration newConf =
|
|
|
+ new YarnConfiguration(setUpCSQueue(maxLifetime, defaultLifetime));
|
|
|
+ conf = new YarnConfiguration(newConf);
|
|
|
rm = new MockRM(conf);
|
|
|
rm.start();
|
|
|
+
|
|
|
Priority appPriority = Priority.newInstance(0);
|
|
|
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * 1024);
|
|
|
|
|
@@ -92,6 +106,13 @@ public class TestApplicationLifetimeMonitor {
|
|
|
timeouts.put(ApplicationTimeoutType.LIFETIME, 20L);
|
|
|
RMApp app2 = rm.submitApp(1024, appPriority, timeouts);
|
|
|
|
|
|
+ // user not set lifetime, so queue max lifetime will be considered.
|
|
|
+ RMApp app3 = rm.submitApp(1024, appPriority, Collections.emptyMap());
|
|
|
+
|
|
|
+ // asc lifetime exceeds queue max lifetime
|
|
|
+ timeouts.put(ApplicationTimeoutType.LIFETIME, 40L);
|
|
|
+ RMApp app4 = rm.submitApp(1024, appPriority, timeouts);
|
|
|
+
|
|
|
nm1.nodeHeartbeat(true);
|
|
|
// Send launch Event
|
|
|
MockAM am1 =
|
|
@@ -103,8 +124,9 @@ public class TestApplicationLifetimeMonitor {
|
|
|
|
|
|
Map<ApplicationTimeoutType, String> updateTimeout =
|
|
|
new HashMap<ApplicationTimeoutType, String>();
|
|
|
- long newLifetime = 10L;
|
|
|
- // update 10L seconds more to timeout
|
|
|
+ long newLifetime = 40L;
|
|
|
+ // update 30L seconds more to timeout which is greater than queue max
|
|
|
+ // lifetime
|
|
|
String formatISO8601 =
|
|
|
Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000);
|
|
|
updateTimeout.put(ApplicationTimeoutType.LIFETIME, formatISO8601);
|
|
@@ -142,8 +164,6 @@ public class TestApplicationLifetimeMonitor {
|
|
|
!appTimeouts.isEmpty());
|
|
|
ApplicationTimeout timeout =
|
|
|
appTimeouts.get(ApplicationTimeoutType.LIFETIME);
|
|
|
- Assert.assertEquals("Application timeout string is incorrect.",
|
|
|
- formatISO8601, timeout.getExpiryTime());
|
|
|
Assert.assertTrue("Application remaining time is incorrect",
|
|
|
timeout.getRemainingTime() > 0);
|
|
|
|
|
@@ -152,6 +172,17 @@ public class TestApplicationLifetimeMonitor {
|
|
|
Assert.assertTrue("Application killed before lifetime value",
|
|
|
app2.getFinishTime() > afterUpdate);
|
|
|
|
|
|
+ rm.waitForState(app3.getApplicationId(), RMAppState.KILLED);
|
|
|
+
|
|
|
+ // app4 submitted exceeding queue max lifetime, so killed after queue max
|
|
|
+ // lifetime.
|
|
|
+ rm.waitForState(app4.getApplicationId(), RMAppState.KILLED);
|
|
|
+ long totalTimeRun = (app4.getFinishTime() - app4.getSubmitTime()) / 1000;
|
|
|
+ Assert.assertTrue("Application killed before lifetime value",
|
|
|
+ totalTimeRun > maxLifetime);
|
|
|
+ Assert.assertTrue(
|
|
|
+ "Application killed before lifetime value " + totalTimeRun,
|
|
|
+ totalTimeRun < maxLifetime + 10L);
|
|
|
} finally {
|
|
|
stopRM(rm);
|
|
|
}
|
|
@@ -172,7 +203,7 @@ public class TestApplicationLifetimeMonitor {
|
|
|
nm1.registerNode();
|
|
|
nm1.nodeHeartbeat(true);
|
|
|
|
|
|
- long appLifetime = 60L;
|
|
|
+ long appLifetime = 30L;
|
|
|
Map<ApplicationTimeoutType, Long> timeouts =
|
|
|
new HashMap<ApplicationTimeoutType, Long>();
|
|
|
timeouts.put(ApplicationTimeoutType.LIFETIME, appLifetime);
|
|
@@ -305,6 +336,21 @@ public class TestApplicationLifetimeMonitor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private CapacitySchedulerConfiguration setUpCSQueue(long maxLifetime,
|
|
|
+ long defaultLifetime) {
|
|
|
+ CapacitySchedulerConfiguration csConf =
|
|
|
+ new CapacitySchedulerConfiguration();
|
|
|
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
|
|
+ new String[] {"default"});
|
|
|
+ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".default", 100);
|
|
|
+ csConf.setMaximumLifetimePerQueue(
|
|
|
+ CapacitySchedulerConfiguration.ROOT + ".default", maxLifetime);
|
|
|
+ csConf.setDefaultLifetimePerQueue(
|
|
|
+ CapacitySchedulerConfiguration.ROOT + ".default", defaultLifetime);
|
|
|
+
|
|
|
+ return csConf;
|
|
|
+ }
|
|
|
+
|
|
|
private void stopRM(MockRM rm) {
|
|
|
if (rm != null) {
|
|
|
rm.stop();
|