|
@@ -383,7 +383,7 @@ public class TestAMRestart {
|
|
|
public void testAMBlacklistPreventsRestartOnSameNode() throws Exception {
|
|
|
YarnConfiguration conf = new YarnConfiguration();
|
|
|
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
|
|
|
- testAMBlacklistPreventRestartOnSameNode(conf);
|
|
|
+ testAMBlacklistPreventRestartOnSameNode(false, conf);
|
|
|
}
|
|
|
|
|
|
@Test(timeout = 100000)
|
|
@@ -393,11 +393,28 @@ public class TestAMRestart {
|
|
|
conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
|
|
|
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
|
|
|
true);
|
|
|
- testAMBlacklistPreventRestartOnSameNode(conf);
|
|
|
+ testAMBlacklistPreventRestartOnSameNode(false, conf);
|
|
|
}
|
|
|
|
|
|
- private void testAMBlacklistPreventRestartOnSameNode(YarnConfiguration conf)
|
|
|
- throws Exception{
|
|
|
+ @Test(timeout = 100000)
|
|
|
+ public void testAMBlacklistPreemption() throws Exception {
|
|
|
+ YarnConfiguration conf = new YarnConfiguration();
|
|
|
+ conf.setBoolean(YarnConfiguration.AM_BLACKLISTING_ENABLED, true);
|
|
|
+ // disable the float so it is possible to blacklist the entire cluster
|
|
|
+ conf.setFloat(YarnConfiguration.AM_BLACKLISTING_DISABLE_THRESHOLD, 1.5f);
|
|
|
+ // since the exit status is PREEMPTED, it should not lead to the node being
|
|
|
+ // blacklisted
|
|
|
+ testAMBlacklistPreventRestartOnSameNode(true, conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests AM blacklisting. In the multi-node mode (i.e. singleNode = false),
|
|
|
+ * it tests the blacklisting behavior so that the AM container gets allocated
|
|
|
+ * on the node that is not blacklisted. In the single-node mode, it tests the
|
|
|
+ * PREEMPTED status to see if the AM container can continue to be scheduled.
|
|
|
+ */
|
|
|
+ private void testAMBlacklistPreventRestartOnSameNode(boolean singleNode,
|
|
|
+ YarnConfiguration conf) throws Exception {
|
|
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
memStore.init(conf);
|
|
|
final DrainDispatcher dispatcher = new DrainDispatcher();
|
|
@@ -424,9 +441,12 @@ public class TestAMRestart {
|
|
|
new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
|
|
|
nm1.registerNode();
|
|
|
|
|
|
- MockNM nm2 =
|
|
|
- new MockNM("127.0.0.2:2345", 8000, rm1.getResourceTrackerService());
|
|
|
- nm2.registerNode();
|
|
|
+ MockNM nm2 = null;
|
|
|
+ if (!singleNode) {
|
|
|
+ nm2 =
|
|
|
+ new MockNM("127.0.0.2:2345", 8000, rm1.getResourceTrackerService());
|
|
|
+ nm2.registerNode();
|
|
|
+ }
|
|
|
|
|
|
RMApp app1 = rm1.submitApp(200);
|
|
|
|
|
@@ -440,60 +460,84 @@ public class TestAMRestart {
|
|
|
NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
|
|
|
|
|
|
MockNM currentNode, otherNode;
|
|
|
- if (nodeWhereAMRan == nm1.getNodeId()) {
|
|
|
+ if (singleNode) {
|
|
|
+ Assert.assertEquals(nm1.getNodeId(), nodeWhereAMRan);
|
|
|
currentNode = nm1;
|
|
|
- otherNode = nm2;
|
|
|
+ otherNode = null; // not applicable
|
|
|
} else {
|
|
|
- currentNode = nm2;
|
|
|
- otherNode = nm1;
|
|
|
+ if (nodeWhereAMRan == nm1.getNodeId()) {
|
|
|
+ currentNode = nm1;
|
|
|
+ otherNode = nm2;
|
|
|
+ } else {
|
|
|
+ currentNode = nm2;
|
|
|
+ otherNode = nm1;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+ // set the exist status to test
|
|
|
+ // any status other than SUCCESS and PREEMPTED should cause the node to be
|
|
|
+ // blacklisted
|
|
|
+ int exitStatus = singleNode ?
|
|
|
+ ContainerExitStatus.PREEMPTED :
|
|
|
+ ContainerExitStatus.INVALID;
|
|
|
ContainerStatus containerStatus =
|
|
|
BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE,
|
|
|
- "", ContainerExitStatus.DISKS_FAILED, Resources.createResource(200));
|
|
|
+ "", exitStatus, Resources.createResource(200));
|
|
|
currentNode.containerStatus(containerStatus);
|
|
|
am1.waitForState(RMAppAttemptState.FAILED);
|
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
|
|
|
|
// restart the am
|
|
|
- RMAppAttempt attempt = rm1.waitForAttemptScheduled(app1, rm1);
|
|
|
+ RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app1, rm1);
|
|
|
System.out.println("Launch AM " + attempt.getAppAttemptId());
|
|
|
|
|
|
|
|
|
|
|
|
currentNode.nodeHeartbeat(true);
|
|
|
dispatcher.await();
|
|
|
- Assert.assertEquals(
|
|
|
- "AppAttemptState should still be SCHEDULED if currentNode is " +
|
|
|
- "blacklisted correctly",
|
|
|
- RMAppAttemptState.SCHEDULED,
|
|
|
- attempt.getAppAttemptState());
|
|
|
|
|
|
- otherNode.nodeHeartbeat(true);
|
|
|
- dispatcher.await();
|
|
|
+ if (!singleNode) {
|
|
|
+ Assert.assertEquals(
|
|
|
+ "AppAttemptState should still be SCHEDULED if currentNode is " +
|
|
|
+ "blacklisted correctly",
|
|
|
+ RMAppAttemptState.SCHEDULED,
|
|
|
+ attempt.getAppAttemptState());
|
|
|
+
|
|
|
+ otherNode.nodeHeartbeat(true);
|
|
|
+ dispatcher.await();
|
|
|
+ }
|
|
|
|
|
|
MockAM am2 = rm1.sendAMLaunched(attempt.getAppAttemptId());
|
|
|
rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
|
|
-
|
|
|
amContainer =
|
|
|
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
|
|
|
rmContainer = scheduler.getRMContainer(amContainer);
|
|
|
nodeWhereAMRan = rmContainer.getAllocatedNode();
|
|
|
- Assert.assertEquals(
|
|
|
- "After blacklisting AM should have run on the other node",
|
|
|
- otherNode.getNodeId(), nodeWhereAMRan);
|
|
|
-
|
|
|
- am2.registerAppAttempt();
|
|
|
- rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
|
|
-
|
|
|
- List<Container> allocatedContainers =
|
|
|
- allocateContainers(currentNode, am2, 1);
|
|
|
- Assert.assertEquals(
|
|
|
- "Even though AM is blacklisted from the node, application can still " +
|
|
|
- "allocate containers there",
|
|
|
- currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
|
|
|
+ if (singleNode) {
|
|
|
+ // with preemption, the node should not be blacklisted and should get the
|
|
|
+ // assignment (with a single node)
|
|
|
+ Assert.assertEquals(
|
|
|
+ "AM should still have been able to run on the same node",
|
|
|
+ currentNode.getNodeId(), nodeWhereAMRan);
|
|
|
+ } else {
|
|
|
+ // with a failed status, the other node should receive the assignment
|
|
|
+ Assert.assertEquals(
|
|
|
+ "After blacklisting AM should have run on the other node",
|
|
|
+ otherNode.getNodeId(), nodeWhereAMRan);
|
|
|
+
|
|
|
+ am2.registerAppAttempt();
|
|
|
+ rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
|
|
+
|
|
|
+ List<Container> allocatedContainers =
|
|
|
+ allocateContainers(currentNode, am2, 1);
|
|
|
+ Assert.assertEquals(
|
|
|
+ "Even though AM is blacklisted from the node, application can " +
|
|
|
+ "still allocate containers there",
|
|
|
+ currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+
|
|
|
// AM container preempted, nm disk failure
|
|
|
// should not be counted towards AM max retry count.
|
|
|
@Test(timeout = 100000)
|