|
@@ -164,6 +164,8 @@ public class MockRM extends ResourceManager {
|
|
|
nm.nodeHeartbeat(true);
|
|
|
Thread.sleep(200);
|
|
|
}
|
|
|
+ Assert.assertNotNull("Failed in waiting for " + containerId + " " +
|
|
|
+ "allocation.", getResourceScheduler().getRMContainer(containerId));
|
|
|
}
|
|
|
|
|
|
public void waitForContainerToComplete(RMAppAttempt attempt,
|
|
@@ -662,7 +664,7 @@ public class MockRM extends ResourceManager {
|
|
|
am.waitForState(RMAppAttemptState.FINISHED);
|
|
|
rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
private static void waitForSchedulerAppAttemptAdded(
|
|
|
ApplicationAttemptId attemptId, MockRM rm) throws InterruptedException {
|
|
@@ -677,6 +679,9 @@ public class MockRM extends ResourceManager {
|
|
|
}
|
|
|
tick++;
|
|
|
}
|
|
|
+ Assert.assertNotNull("Timed out waiting for SchedulerApplicationAttempt=" +
|
|
|
+ attemptId + " to be added.", ((AbstractYarnScheduler)
|
|
|
+ rm.getResourceScheduler()).getApplicationAttempt(attemptId));
|
|
|
}
|
|
|
|
|
|
public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
|
@@ -684,6 +689,7 @@ public class MockRM extends ResourceManager {
|
|
|
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
|
|
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
|
|
waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
|
|
|
+ rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED);
|
|
|
System.out.println("Launch AM " + attempt.getAppAttemptId());
|
|
|
nm.nodeHeartbeat(true);
|
|
|
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|