|
@@ -105,8 +105,12 @@ public class MockRM extends ResourceManager {
|
|
|
|
|
|
static final Logger LOG = Logger.getLogger(MockRM.class);
|
|
|
static final String ENABLE_WEBAPP = "mockrm.webapp.enabled";
|
|
|
-
|
|
|
- final private boolean useNullRMNodeLabelsManager;
|
|
|
+ private static final int SECOND = 1000;
|
|
|
+ private static final int TIMEOUT_MS_FOR_ATTEMPT = 40 * SECOND;
|
|
|
+ private static final int TIMEOUT_MS_FOR_CONTAINER_AND_NODE = 10 * SECOND;
|
|
|
+ private static final int WAIT_MS_PER_LOOP = 10;
|
|
|
+
|
|
|
+ private final boolean useNullRMNodeLabelsManager;
|
|
|
|
|
|
public MockRM() {
|
|
|
this(new YarnConfiguration());
|
|
@@ -158,106 +162,107 @@ public class MockRM extends ResourceManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Wait until an application has reached a specified state.
|
|
|
+ * The timeout is 80 seconds.
|
|
|
+ * @param appId the id of an application
|
|
|
+ * @param finalState the application state waited
|
|
|
+ * @throws InterruptedException
|
|
|
+ * if interrupted while waiting for the state transition
|
|
|
+ */
|
|
|
public void waitForState(ApplicationId appId, RMAppState finalState)
|
|
|
- throws Exception {
|
|
|
+ throws InterruptedException {
|
|
|
RMApp app = getRMContext().getRMApps().get(appId);
|
|
|
Assert.assertNotNull("app shouldn't be null", app);
|
|
|
- final int timeoutMsecs = 80000;
|
|
|
- final int waitMsPerLoop = 500;
|
|
|
- int loop = 0;
|
|
|
- while (!finalState.equals(app.getState()) &&
|
|
|
- ((waitMsPerLoop * loop) < timeoutMsecs)) {
|
|
|
+ final int timeoutMsecs = 80 * SECOND;
|
|
|
+ int timeWaiting = 0;
|
|
|
+ while (!finalState.equals(app.getState())) {
|
|
|
+ if (timeWaiting >= timeoutMsecs) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
LOG.info("App : " + appId + " State is : " + app.getState() +
|
|
|
- " Waiting for state : " + finalState);
|
|
|
- Thread.yield();
|
|
|
- Thread.sleep(waitMsPerLoop);
|
|
|
- loop++;
|
|
|
- }
|
|
|
- int waitedMsecs = waitMsPerLoop * loop;
|
|
|
- LOG.info("App State is : " + app.getState());
|
|
|
- if (waitedMsecs >= timeoutMsecs) {
|
|
|
- Assert.fail("App state is not correct (timedout): expected: " +
|
|
|
- finalState + " actual: " + app.getState() +
|
|
|
- " for the application " + appId);
|
|
|
+ " Waiting for state : " + finalState);
|
|
|
+ Thread.sleep(WAIT_MS_PER_LOOP);
|
|
|
+ timeWaiting += WAIT_MS_PER_LOOP;
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
+ LOG.info("App State is : " + app.getState());
|
|
|
+ Assert.assertEquals("App State is not correct (timeout).", finalState,
|
|
|
+ app.getState());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Wait until an attempt has reached a specified state.
|
|
|
+ * The timeout is 40 seconds.
|
|
|
+ * @param attemptId the id of an attempt
|
|
|
+ * @param finalState the attempt state waited
|
|
|
+ * @throws InterruptedException
|
|
|
+ * if interrupted while waiting for the state transition
|
|
|
+ */
|
|
|
public void waitForState(ApplicationAttemptId attemptId,
|
|
|
- RMAppAttemptState finalState)
|
|
|
- throws Exception {
|
|
|
- waitForState(attemptId, finalState, 40000);
|
|
|
- }
|
|
|
-
|
|
|
+ RMAppAttemptState finalState) throws InterruptedException {
|
|
|
+ waitForState(attemptId, finalState, TIMEOUT_MS_FOR_ATTEMPT);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Wait until an attempt has reached a specified state.
|
|
|
+ * The timeout can be specified by the parameter.
|
|
|
+ * @param attemptId the id of an attempt
|
|
|
+ * @param finalState the attempt state waited
|
|
|
+ * @param timeoutMsecs the length of timeout in milliseconds
|
|
|
+ * @throws InterruptedException
|
|
|
+ * if interrupted while waiting for the state transition
|
|
|
+ */
|
|
|
public void waitForState(ApplicationAttemptId attemptId,
|
|
|
- RMAppAttemptState finalState, int timeoutMsecs) throws Exception {
|
|
|
+ RMAppAttemptState finalState, int timeoutMsecs)
|
|
|
+ throws InterruptedException {
|
|
|
RMApp app = getRMContext().getRMApps().get(attemptId.getApplicationId());
|
|
|
Assert.assertNotNull("app shouldn't be null", app);
|
|
|
RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
|
|
|
- final int minWaitMsecs = 1000;
|
|
|
- final int waitMsPerLoop = 10;
|
|
|
- int loop = 0;
|
|
|
- while (!finalState.equals(attempt.getAppAttemptState())
|
|
|
- && waitMsPerLoop * loop < timeoutMsecs) {
|
|
|
- LOG.info("AppAttempt : " + attemptId + " State is : " +
|
|
|
- attempt.getAppAttemptState() + " Waiting for state : " + finalState);
|
|
|
- Thread.yield();
|
|
|
- Thread.sleep(waitMsPerLoop);
|
|
|
- loop++;
|
|
|
- }
|
|
|
- int waitedMsecs = waitMsPerLoop * loop;
|
|
|
- if (minWaitMsecs > waitedMsecs) {
|
|
|
- Thread.sleep(minWaitMsecs - waitedMsecs);
|
|
|
- }
|
|
|
- LOG.info("Attempt State is : " + attempt.getAppAttemptState());
|
|
|
- if (waitedMsecs >= timeoutMsecs) {
|
|
|
- Assert.fail("Attempt state is not correct (timedout): expected: "
|
|
|
- + finalState + " actual: " + attempt.getAppAttemptState()+
|
|
|
- " for the application attempt " + attemptId);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void waitForContainerState(ContainerId containerId,
|
|
|
- RMContainerState state) throws Exception {
|
|
|
- // This method will assert if state is not expected after timeout.
|
|
|
- Assert.assertTrue(waitForContainerState(containerId, state, 8 * 1000));
|
|
|
- }
|
|
|
-
|
|
|
- public boolean waitForContainerState(ContainerId containerId,
|
|
|
- RMContainerState containerState, int timeoutMillisecs) throws Exception {
|
|
|
- RMContainer container = getResourceScheduler().getRMContainer(containerId);
|
|
|
- int timeoutSecs = 0;
|
|
|
- while (((container == null) || !containerState.equals(container.getState()))
|
|
|
- && timeoutSecs++ < timeoutMillisecs / 100) {
|
|
|
- if(container == null){
|
|
|
- container = getResourceScheduler().getRMContainer(containerId);
|
|
|
+ MockRM.waitForState(attempt, finalState, timeoutMsecs);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Wait until an attempt has reached a specified state.
|
|
|
+ * The timeout is 40 seconds.
|
|
|
+ * @param attempt an attempt
|
|
|
+ * @param finalState the attempt state waited
|
|
|
+ * @throws InterruptedException
|
|
|
+ * if interrupted while waiting for the state transition
|
|
|
+ */
|
|
|
+ public static void waitForState(RMAppAttempt attempt,
|
|
|
+ RMAppAttemptState finalState) throws InterruptedException {
|
|
|
+ waitForState(attempt, finalState, TIMEOUT_MS_FOR_ATTEMPT);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Wait until an attempt has reached a specified state.
|
|
|
+ * The timeout can be specified by the parameter.
|
|
|
+ * @param attempt an attempt
|
|
|
+ * @param finalState the attempt state waited
|
|
|
+ * @param timeoutMsecs the length of timeout in milliseconds
|
|
|
+ * @throws InterruptedException
|
|
|
+ * if interrupted while waiting for the state transition
|
|
|
+ */
|
|
|
+ public static void waitForState(RMAppAttempt attempt,
|
|
|
+ RMAppAttemptState finalState, int timeoutMsecs)
|
|
|
+ throws InterruptedException {
|
|
|
+ int timeWaiting = 0;
|
|
|
+ while (!finalState.equals(attempt.getAppAttemptState())) {
|
|
|
+ if (timeWaiting >= timeoutMsecs) {
|
|
|
+ break;
|
|
|
}
|
|
|
- System.out.println("Container : " + containerId +
|
|
|
- " Waiting for state : " + containerState);
|
|
|
|
|
|
- Thread.sleep(100);
|
|
|
-
|
|
|
- if (timeoutMillisecs <= timeoutSecs * 100) {
|
|
|
- return false;
|
|
|
- }
|
|
|
+ LOG.info("AppAttempt : " + attempt.getAppAttemptId() + " State is : " +
|
|
|
+ attempt.getAppAttemptState() + " Waiting for state : " + finalState);
|
|
|
+ Thread.sleep(WAIT_MS_PER_LOOP);
|
|
|
+ timeWaiting += WAIT_MS_PER_LOOP;
|
|
|
}
|
|
|
|
|
|
- System.out.println("Container State is : " + container.getState());
|
|
|
- Assert.assertEquals("Container state is not correct (timedout)",
|
|
|
- containerState, container.getState());
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- public void waitForContainerAllocated(MockNM nm, ContainerId containerId)
|
|
|
- throws Exception {
|
|
|
- int timeoutSecs = 0;
|
|
|
- while (getResourceScheduler().getRMContainer(containerId) == null
|
|
|
- && timeoutSecs++ < 40) {
|
|
|
- System.out.println("Waiting for" + containerId + " to be allocated.");
|
|
|
- nm.nodeHeartbeat(true);
|
|
|
- Thread.sleep(200);
|
|
|
- }
|
|
|
- Assert.assertNotNull("Failed in waiting for " + containerId + " " +
|
|
|
- "allocation.", getResourceScheduler().getRMContainer(containerId));
|
|
|
+ LOG.info("Attempt State is : " + attempt.getAppAttemptState());
|
|
|
+ Assert.assertEquals("Attempt state is not correct (timeout).", finalState,
|
|
|
+ attempt.getState());
|
|
|
}
|
|
|
|
|
|
public void waitForContainerToComplete(RMAppAttempt attempt,
|
|
@@ -271,7 +276,7 @@ public class MockRM extends ResourceManager {
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
|
- Thread.sleep(200);
|
|
|
+ Thread.sleep(WAIT_MS_PER_LOOP);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -283,58 +288,109 @@ public class MockRM extends ResourceManager {
|
|
|
System.out.println("Application " + appId
|
|
|
+ " is waiting for AM to restart. Current has "
|
|
|
+ app.getAppAttempts().size() + " attempts.");
|
|
|
- Thread.sleep(200);
|
|
|
+ Thread.sleep(WAIT_MS_PER_LOOP);
|
|
|
}
|
|
|
return launchAndRegisterAM(app, this, nm);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Wait until a container has reached a specified state.
|
|
|
+ * The timeout is 10 seconds.
|
|
|
+ * @param nm A mock nodemanager
|
|
|
+ * @param containerId the id of a container
|
|
|
+ * @param containerState the container state waited
|
|
|
+ * @return if reach the state before timeout; false otherwise.
|
|
|
+ * @throws Exception
|
|
|
+ * if interrupted while waiting for the state transition
|
|
|
+ * or an unexpected error while MockNM is hearbeating.
|
|
|
+ */
|
|
|
public boolean waitForState(MockNM nm, ContainerId containerId,
|
|
|
RMContainerState containerState) throws Exception {
|
|
|
- // default is wait for 30,000 ms
|
|
|
- return waitForState(nm, containerId, containerState, 30 * 1000);
|
|
|
- }
|
|
|
-
|
|
|
+ return waitForState(nm, containerId, containerState,
|
|
|
+ TIMEOUT_MS_FOR_CONTAINER_AND_NODE);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Wait until a container has reached a specified state.
|
|
|
+ * The timeout is specified by the parameter.
|
|
|
+ * @param nm A mock nodemanager
|
|
|
+ * @param containerId the id of a container
|
|
|
+ * @param containerState the container state waited
|
|
|
+ * @param timeoutMsecs the length of timeout in milliseconds
|
|
|
+ * @return if reach the state before timeout; false otherwise.
|
|
|
+ * @throws Exception
|
|
|
+ * if interrupted while waiting for the state transition
|
|
|
+ * or an unexpected error while MockNM is hearbeating.
|
|
|
+ */
|
|
|
public boolean waitForState(MockNM nm, ContainerId containerId,
|
|
|
- RMContainerState containerState, int timeoutMillisecs) throws Exception {
|
|
|
+ RMContainerState containerState, int timeoutMsecs) throws Exception {
|
|
|
return waitForState(Arrays.asList(nm), containerId, containerState,
|
|
|
- timeoutMillisecs);
|
|
|
- }
|
|
|
-
|
|
|
+ timeoutMsecs);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Wait until a container has reached a specified state.
|
|
|
+ * The timeout is 10 seconds.
|
|
|
+ * @param nms array of mock nodemanagers
|
|
|
+ * @param containerId the id of a container
|
|
|
+ * @param containerState the container state waited
|
|
|
+ * @return if reach the state before timeout; false otherwise.
|
|
|
+ * @throws Exception
|
|
|
+ * if interrupted while waiting for the state transition
|
|
|
+ * or an unexpected error while MockNM is hearbeating.
|
|
|
+ */
|
|
|
+ public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
|
|
|
+ RMContainerState containerState) throws Exception {
|
|
|
+ return waitForState(nms, containerId, containerState,
|
|
|
+ TIMEOUT_MS_FOR_CONTAINER_AND_NODE);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Wait until a container has reached a specified state.
|
|
|
+ * The timeout is specified by the parameter.
|
|
|
+ * @param nms array of mock nodemanagers
|
|
|
+ * @param containerId the id of a container
|
|
|
+ * @param containerState the container state waited
|
|
|
+ * @param timeoutMsecs the length of timeout in milliseconds
|
|
|
+ * @return if reach the state before timeout; false otherwise.
|
|
|
+ * @throws Exception
|
|
|
+ * if interrupted while waiting for the state transition
|
|
|
+ * or an unexpected error while MockNM is hearbeating.
|
|
|
+ */
|
|
|
public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
|
|
|
- RMContainerState containerState, int timeoutMillisecs) throws Exception {
|
|
|
+ RMContainerState containerState, int timeoutMsecs) throws Exception {
|
|
|
RMContainer container = getResourceScheduler().getRMContainer(containerId);
|
|
|
- int timeoutSecs = 0;
|
|
|
- while(container == null && timeoutSecs++ < timeoutMillisecs / 100) {
|
|
|
+ int timeWaiting = 0;
|
|
|
+ while (container == null) {
|
|
|
+ if (timeWaiting >= timeoutMsecs) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
for (MockNM nm : nms) {
|
|
|
nm.nodeHeartbeat(true);
|
|
|
}
|
|
|
container = getResourceScheduler().getRMContainer(containerId);
|
|
|
System.out.println("Waiting for container " + containerId + " to be "
|
|
|
+ containerState + ", container is null right now.");
|
|
|
- Thread.sleep(100);
|
|
|
-
|
|
|
- if (timeoutMillisecs <= timeoutSecs * 100) {
|
|
|
+ Thread.sleep(WAIT_MS_PER_LOOP);
|
|
|
+ timeWaiting += WAIT_MS_PER_LOOP;
|
|
|
+ }
|
|
|
+
|
|
|
+ while (!containerState.equals(container.getState())) {
|
|
|
+ if (timeWaiting >= timeoutMsecs) {
|
|
|
return false;
|
|
|
}
|
|
|
- }
|
|
|
- Assert.assertNotNull("Container shouldn't be null", container);
|
|
|
- while (!containerState.equals(container.getState())
|
|
|
- && timeoutSecs++ < timeoutMillisecs / 100) {
|
|
|
+
|
|
|
System.out.println("Container : " + containerId + " State is : "
|
|
|
+ container.getState() + " Waiting for state : " + containerState);
|
|
|
for (MockNM nm : nms) {
|
|
|
nm.nodeHeartbeat(true);
|
|
|
}
|
|
|
- Thread.sleep(100);
|
|
|
-
|
|
|
- if (timeoutMillisecs <= timeoutSecs * 100) {
|
|
|
- return false;
|
|
|
- }
|
|
|
+ Thread.sleep(WAIT_MS_PER_LOOP);
|
|
|
+ timeWaiting += WAIT_MS_PER_LOOP;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
System.out.println("Container State is : " + container.getState());
|
|
|
- Assert.assertEquals("Container state is not correct (timedout)",
|
|
|
- containerState, container.getState());
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -638,16 +694,30 @@ public class MockRM extends ResourceManager {
|
|
|
node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.EXPIRE));
|
|
|
}
|
|
|
|
|
|
- public void NMwaitForState(NodeId nodeid, NodeState finalState)
|
|
|
- throws Exception {
|
|
|
- RMNode node = getRMContext().getRMNodes().get(nodeid);
|
|
|
+ /**
|
|
|
+ * Wait until a node has reached a specified state.
|
|
|
+ * The timeout is 10 seconds.
|
|
|
+ * @param nodeId the id of a node
|
|
|
+ * @param finalState the node state waited
|
|
|
+ * @throws InterruptedException
|
|
|
+ * if interrupted while waiting for the state transition
|
|
|
+ */
|
|
|
+ public void waitForState(NodeId nodeId, NodeState finalState)
|
|
|
+ throws InterruptedException {
|
|
|
+ RMNode node = getRMContext().getRMNodes().get(nodeId);
|
|
|
Assert.assertNotNull("node shouldn't be null", node);
|
|
|
- int timeoutSecs = 0;
|
|
|
- while (!finalState.equals(node.getState()) && timeoutSecs++ < 20) {
|
|
|
+ int timeWaiting = 0;
|
|
|
+ while (!finalState.equals(node.getState())) {
|
|
|
+ if (timeWaiting >= TIMEOUT_MS_FOR_CONTAINER_AND_NODE) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
System.out.println("Node State is : " + node.getState()
|
|
|
+ " Waiting for state : " + finalState);
|
|
|
- Thread.sleep(500);
|
|
|
+ Thread.sleep(WAIT_MS_PER_LOOP);
|
|
|
+ timeWaiting += WAIT_MS_PER_LOOP;
|
|
|
}
|
|
|
+
|
|
|
System.out.println("Node State is : " + node.getState());
|
|
|
Assert.assertEquals("Node state is not correct (timedout)", finalState,
|
|
|
node.getState());
|
|
@@ -671,7 +741,7 @@ public class MockRM extends ResourceManager {
|
|
|
public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId)
|
|
|
throws Exception {
|
|
|
MockAM am = new MockAM(getRMContext(), masterService, appAttemptId);
|
|
|
- am.waitForState(RMAppAttemptState.ALLOCATED);
|
|
|
+ waitForState(appAttemptId, RMAppAttemptState.ALLOCATED);
|
|
|
//create and set AMRMToken
|
|
|
Token<AMRMTokenIdentifier> amrmToken =
|
|
|
this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
|
|
@@ -690,7 +760,7 @@ public class MockRM extends ResourceManager {
|
|
|
public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId)
|
|
|
throws Exception {
|
|
|
MockAM am = new MockAM(getRMContext(), masterService, appAttemptId);
|
|
|
- am.waitForState(RMAppAttemptState.ALLOCATED);
|
|
|
+ waitForState(am.getApplicationAttemptId(), RMAppAttemptState.ALLOCATED);
|
|
|
getRMContext().getDispatcher().getEventHandler()
|
|
|
.handle(new RMAppAttemptEvent(appAttemptId,
|
|
|
RMAppAttemptEventType.LAUNCH_FAILED, "Failed"));
|
|
@@ -825,9 +895,9 @@ public class MockRM extends ResourceManager {
|
|
|
FinishApplicationMasterRequest.newInstance(
|
|
|
FinalApplicationStatus.SUCCEEDED, "", "");
|
|
|
am.unregisterAppAttempt(req,true);
|
|
|
- am.waitForState(RMAppAttemptState.FINISHING);
|
|
|
+ rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHING);
|
|
|
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
- am.waitForState(RMAppAttemptState.FINISHED);
|
|
|
+ rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FINISHED);
|
|
|
rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
|
|
|
}
|
|
|
|