|
@@ -232,20 +232,7 @@ public class TestApplicationCleanup {
|
|
|
containerStatuses.put(app.getApplicationId(), containerStatusList);
|
|
|
|
|
|
NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
|
|
|
- dispatcher.await();
|
|
|
- List<ContainerId> contsToClean = resp.getContainersToCleanup();
|
|
|
- int cleanedConts = contsToClean.size();
|
|
|
- waitCount = 0;
|
|
|
- while (cleanedConts < 1 && waitCount++ < 200) {
|
|
|
- LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
|
|
|
- Thread.sleep(100);
|
|
|
- resp = nm1.nodeHeartbeat(true);
|
|
|
- dispatcher.await();
|
|
|
- contsToClean = resp.getContainersToCleanup();
|
|
|
- cleanedConts += contsToClean.size();
|
|
|
- }
|
|
|
- LOG.info("Got cleanup for " + contsToClean.get(0));
|
|
|
- Assert.assertEquals(1, cleanedConts);
|
|
|
+ waitForContainerCleanup(dispatcher, nm1, resp);
|
|
|
|
|
|
// Now to test the case when RM already gave cleanup, and NM suddenly
|
|
|
// realizes that the container is running.
|
|
@@ -258,26 +245,36 @@ public class TestApplicationCleanup {
|
|
|
containerStatuses.put(app.getApplicationId(), containerStatusList);
|
|
|
|
|
|
resp = nm1.nodeHeartbeat(containerStatuses, true);
|
|
|
- dispatcher.await();
|
|
|
- contsToClean = resp.getContainersToCleanup();
|
|
|
- cleanedConts = contsToClean.size();
|
|
|
// The cleanup list won't be instantaneous as it is given out by scheduler
|
|
|
// and not RMNodeImpl.
|
|
|
- waitCount = 0;
|
|
|
- while (cleanedConts < 1 && waitCount++ < 200) {
|
|
|
- LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
|
|
|
- Thread.sleep(100);
|
|
|
- resp = nm1.nodeHeartbeat(true);
|
|
|
+ waitForContainerCleanup(dispatcher, nm1, resp);
|
|
|
+
|
|
|
+ rm.stop();
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm,
|
|
|
+ NodeHeartbeatResponse resp) throws Exception {
|
|
|
+ int waitCount = 0, cleanedConts = 0;
|
|
|
+ List<ContainerId> contsToClean;
|
|
|
+ do {
|
|
|
dispatcher.await();
|
|
|
contsToClean = resp.getContainersToCleanup();
|
|
|
cleanedConts += contsToClean.size();
|
|
|
+ if (cleanedConts >= 1) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ Thread.sleep(100);
|
|
|
+ resp = nm.nodeHeartbeat(true);
|
|
|
+ } while(waitCount++ < 200);
|
|
|
+
|
|
|
+ if (contsToClean.isEmpty()) {
|
|
|
+ LOG.error("Failed to get any containers to cleanup");
|
|
|
+ } else {
|
|
|
+ LOG.info("Got cleanup for " + contsToClean.get(0));
|
|
|
}
|
|
|
- LOG.info("Got cleanup for " + contsToClean.get(0));
|
|
|
Assert.assertEquals(1, cleanedConts);
|
|
|
-
|
|
|
- rm.stop();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId)
|
|
|
throws Exception {
|
|
|
while (true) {
|
|
@@ -400,6 +397,58 @@ public class TestApplicationCleanup {
|
|
|
rm2.stop();
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("resource")
|
|
|
+ @Test (timeout = 60000)
|
|
|
+ public void testContainerCleanupWhenRMRestartedAppNotRegistered() throws
|
|
|
+ Exception {
|
|
|
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
|
|
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
+ memStore.init(conf);
|
|
|
+
|
|
|
+ // start RM
|
|
|
+ final DrainDispatcher dispatcher = new DrainDispatcher();
|
|
|
+ MockRM rm1 = new MockRM(conf, memStore) {
|
|
|
+ @Override
|
|
|
+ protected Dispatcher createDispatcher() {
|
|
|
+ return dispatcher;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 =
|
|
|
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
|
+ nm1.registerNode();
|
|
|
+
|
|
|
+ // create app and launch the AM
|
|
|
+ RMApp app0 = rm1.submitApp(200);
|
|
|
+ MockAM am0 = launchAM(app0, rm1, nm1);
|
|
|
+ nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING);
|
|
|
+ rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
|
|
|
+
|
|
|
+ // start new RM
|
|
|
+ final DrainDispatcher dispatcher2 = new DrainDispatcher();
|
|
|
+ MockRM rm2 = new MockRM(conf, memStore) {
|
|
|
+ @Override
|
|
|
+ protected Dispatcher createDispatcher() {
|
|
|
+ return dispatcher2;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ rm2.start();
|
|
|
+
|
|
|
+ // nm1 register to rm2, and do a heartbeat
|
|
|
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
|
|
+ nm1.registerNode(Arrays.asList(app0.getApplicationId()));
|
|
|
+ rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
|
|
|
+
|
|
|
+ // Add unknown container for application unknown to scheduler
|
|
|
+ NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0
|
|
|
+ .getApplicationAttemptId(), 2, ContainerState.RUNNING);
|
|
|
+
|
|
|
+ waitForContainerCleanup(dispatcher2, nm1, response);
|
|
|
+
|
|
|
+ rm1.stop();
|
|
|
+ rm2.stop();
|
|
|
+ }
|
|
|
+
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
TestApplicationCleanup t = new TestApplicationCleanup();
|
|
|
t.testAppCleanup();
|