|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
|
|
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.Arrays;
|
|
@@ -27,11 +28,14 @@ import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
@@ -39,8 +43,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.TestWorkPreservingRMRestart;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
|
|
+import org.apache.hadoop.yarn.util.Times;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.apache.log4j.LogManager;
|
|
|
import org.apache.log4j.Logger;
|
|
@@ -60,15 +66,11 @@ public class TestApplicationLifetimeMonitor {
|
|
|
Logger rootLogger = LogManager.getRootLogger();
|
|
|
rootLogger.setLevel(Level.DEBUG);
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
|
- conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
|
- conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
|
|
|
- true);
|
|
|
- conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
|
|
- conf.setLong(YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS,
|
|
|
+ conf.setLong(YarnConfiguration.RM_APPLICATION_MONITOR_INTERVAL_MS,
|
|
|
3000L);
|
|
|
}
|
|
|
|
|
|
- @Test(timeout = 90000)
|
|
|
+ @Test(timeout = 60000)
|
|
|
public void testApplicationLifetimeMonitor() throws Exception {
|
|
|
MockRM rm = null;
|
|
|
try {
|
|
@@ -81,22 +83,64 @@ public class TestApplicationLifetimeMonitor {
|
|
|
new HashMap<ApplicationTimeoutType, Long>();
|
|
|
timeouts.put(ApplicationTimeoutType.LIFETIME, 10L);
|
|
|
RMApp app1 = rm.submitApp(1024, appPriority, timeouts);
|
|
|
+
|
|
|
+ // 20L seconds
|
|
|
+ timeouts.put(ApplicationTimeoutType.LIFETIME, 20L);
|
|
|
+ RMApp app2 = rm.submitApp(1024, appPriority, timeouts);
|
|
|
+
|
|
|
nm1.nodeHeartbeat(true);
|
|
|
// Send launch Event
|
|
|
MockAM am1 =
|
|
|
rm.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId());
|
|
|
am1.registerAppAttempt();
|
|
|
rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
|
|
- Assert.assertTrue("Applicaiton killed before lifetime value",
|
|
|
+ Assert.assertTrue("Application killed before lifetime value",
|
|
|
(System.currentTimeMillis() - app1.getSubmitTime()) > 10000);
|
|
|
+
|
|
|
+ Map<ApplicationTimeoutType, String> updateTimeout =
|
|
|
+ new HashMap<ApplicationTimeoutType, String>();
|
|
|
+ long newLifetime = 10L;
|
|
|
+ // update 10L seconds more to timeout
|
|
|
+ updateTimeout.put(ApplicationTimeoutType.LIFETIME,
|
|
|
+ Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000));
|
|
|
+ UpdateApplicationTimeoutsRequest request =
|
|
|
+ UpdateApplicationTimeoutsRequest.newInstance(app2.getApplicationId(),
|
|
|
+ updateTimeout);
|
|
|
+
|
|
|
+ Map<ApplicationTimeoutType, Long> applicationTimeouts =
|
|
|
+ app2.getApplicationTimeouts();
|
|
|
+ // has old timeout time
|
|
|
+ long beforeUpdate =
|
|
|
+ applicationTimeouts.get(ApplicationTimeoutType.LIFETIME);
|
|
|
+
|
|
|
+ // update app2 lifetime to new time i.e now + timeout
|
|
|
+ rm.getRMContext().getClientRMService().updateApplicationTimeouts(request);
|
|
|
+
|
|
|
+ applicationTimeouts =
|
|
|
+ app2.getApplicationTimeouts();
|
|
|
+ long afterUpdate =
|
|
|
+ applicationTimeouts.get(ApplicationTimeoutType.LIFETIME);
|
|
|
+
|
|
|
+ Assert.assertTrue("Application lifetime value not updated",
|
|
|
+ afterUpdate > beforeUpdate);
|
|
|
+
|
|
|
+ rm.waitForState(app2.getApplicationId(), RMAppState.KILLED);
|
|
|
+ // verify for app killed with updated lifetime
|
|
|
+ Assert.assertTrue("Application killed before lifetime value",
|
|
|
+ app2.getFinishTime() > afterUpdate);
|
|
|
+
|
|
|
} finally {
|
|
|
stopRM(rm);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("rawtypes")
|
|
|
@Test(timeout = 180000)
|
|
|
public void testApplicationLifetimeOnRMRestart() throws Exception {
|
|
|
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
|
|
|
+ true);
|
|
|
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
|
|
+
|
|
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
memStore.init(conf);
|
|
|
MockRM rm1 = new MockRM(conf, memStore);
|
|
@@ -115,6 +159,12 @@ public class TestApplicationLifetimeMonitor {
|
|
|
|
|
|
// Re-start RM
|
|
|
MockRM rm2 = new MockRM(conf, memStore);
|
|
|
+
|
|
|
+ // make sure app has been unregistered with old RM else both will trigger
|
|
|
+ // Expire event
|
|
|
+ rm1.getRMContext().getRMAppLifetimeMonitor().unregisterApp(
|
|
|
+ app1.getApplicationId(), ApplicationTimeoutType.LIFETIME);
|
|
|
+
|
|
|
rm2.start();
|
|
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
|
|
|
|
@@ -152,9 +202,87 @@ public class TestApplicationLifetimeMonitor {
|
|
|
|
|
|
// wait for app life time and application to be in killed state.
|
|
|
rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.KILLED);
|
|
|
- Assert.assertTrue("Applicaiton killed before lifetime value",
|
|
|
- (System.currentTimeMillis()
|
|
|
- - recoveredApp1.getSubmitTime()) > appLifetime);
|
|
|
+ Assert.assertTrue("Application killed before lifetime value",
|
|
|
+ recoveredApp1.getFinishTime() > (recoveredApp1.getSubmitTime()
|
|
|
+ + appLifetime * 1000));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 60000)
|
|
|
+ public void testUpdateApplicationTimeoutForStateStoreUpdateFail()
|
|
|
+ throws Exception {
|
|
|
+ MockRM rm1 = null;
|
|
|
+ try {
|
|
|
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
|
|
+
|
|
|
+ MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
|
|
+ private int count = 0;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void updateApplicationStateInternal(
|
|
|
+ ApplicationId appId, ApplicationStateData appState)
|
|
|
+ throws Exception {
|
|
|
+ // fail only 1 time.
|
|
|
+ if (count++ == 0) {
|
|
|
+ throw new Exception("State-store update failed");
|
|
|
+ }
|
|
|
+ super.updateApplicationStateInternal(appId, appState);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ memStore.init(conf);
|
|
|
+ rm1 = new MockRM(conf, memStore);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 =
|
|
|
+ new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
|
|
+ nm1.registerNode();
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
+
|
|
|
+ long appLifetime = 30L;
|
|
|
+ Map<ApplicationTimeoutType, Long> timeouts =
|
|
|
+ new HashMap<ApplicationTimeoutType, Long>();
|
|
|
+ timeouts.put(ApplicationTimeoutType.LIFETIME, appLifetime);
|
|
|
+ RMApp app1 = rm1.submitApp(200, Priority.newInstance(0), timeouts);
|
|
|
+
|
|
|
+ Map<ApplicationTimeoutType, String> updateTimeout =
|
|
|
+ new HashMap<ApplicationTimeoutType, String>();
|
|
|
+ long newLifetime = 10L;
|
|
|
+ // update 10L seconds more to timeout i.e 30L seconds overall
|
|
|
+ updateTimeout.put(ApplicationTimeoutType.LIFETIME,
|
|
|
+ Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000));
|
|
|
+ UpdateApplicationTimeoutsRequest request =
|
|
|
+ UpdateApplicationTimeoutsRequest.newInstance(app1.getApplicationId(),
|
|
|
+ updateTimeout);
|
|
|
+
|
|
|
+ Map<ApplicationTimeoutType, Long> applicationTimeouts =
|
|
|
+ app1.getApplicationTimeouts();
|
|
|
+ // has old timeout time
|
|
|
+ long beforeUpdate =
|
|
|
+ applicationTimeouts.get(ApplicationTimeoutType.LIFETIME);
|
|
|
+
|
|
|
+ try {
|
|
|
+ // update app2 lifetime to new time i.e now + timeout
|
|
|
+ rm1.getRMContext().getClientRMService()
|
|
|
+ .updateApplicationTimeouts(request);
|
|
|
+ fail("Update application should fail.");
|
|
|
+ } catch (YarnException e) {
|
|
|
+ // expected
|
|
|
+ assertTrue("State-store exception does not containe appId",
|
|
|
+ e.getMessage().contains(app1.getApplicationId().toString()));
|
|
|
+ }
|
|
|
+
|
|
|
+ applicationTimeouts = app1.getApplicationTimeouts();
|
|
|
+ // has old timeout time
|
|
|
+ long afterUpdate =
|
|
|
+ applicationTimeouts.get(ApplicationTimeoutType.LIFETIME);
|
|
|
+
|
|
|
+ Assert.assertEquals("Application timeout is updated", beforeUpdate,
|
|
|
+ afterUpdate);
|
|
|
+ rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
|
|
|
+ // verify for app killed with updated lifetime
|
|
|
+ Assert.assertTrue("Application killed before lifetime value",
|
|
|
+ app1.getFinishTime() > afterUpdate);
|
|
|
+ } finally {
|
|
|
+ stopRM(rm1);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void stopRM(MockRM rm) {
|