|
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
|
|
+import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
|
|
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
|
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
|
@@ -921,7 +922,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
|
|
ClusterMetrics.getMetrics().getUnhealthyNMs());
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
+ @SuppressWarnings({ "unchecked", "rawtypes" })
|
|
|
@Test
|
|
|
public void testHandleContainerStatusInvalidCompletions() throws Exception {
|
|
|
rm = new MockRM(new YarnConfiguration());
|
|
@@ -1075,6 +1076,113 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testNMUnregistration() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ rm = new MockRM(conf);
|
|
|
+ rm.start();
|
|
|
+
|
|
|
+ ResourceTrackerService resourceTrackerService = rm
|
|
|
+ .getResourceTrackerService();
|
|
|
+ MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
|
|
+
|
|
|
+ int shutdownNMsCount = ClusterMetrics.getMetrics()
|
|
|
+ .getNumShutdownNMs();
|
|
|
+ NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
|
|
+ Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
|
|
+
|
|
|
+ UnRegisterNodeManagerRequest request = Records
|
|
|
+ .newRecord(UnRegisterNodeManagerRequest.class);
|
|
|
+ request.setNodeId(nm1.getNodeId());
|
|
|
+ resourceTrackerService.unRegisterNodeManager(request);
|
|
|
+ checkShutdownNMCount(rm, ++shutdownNMsCount);
|
|
|
+
|
|
|
+ // The RM should remove the node after unregistration, hence send a reboot
|
|
|
+ // command.
|
|
|
+ nodeHeartbeat = nm1.nodeHeartbeat(true);
|
|
|
+ Assert.assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction()));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testUnhealthyNMUnregistration() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ rm = new MockRM(conf);
|
|
|
+ rm.start();
|
|
|
+
|
|
|
+ ResourceTrackerService resourceTrackerService = rm
|
|
|
+ .getResourceTrackerService();
|
|
|
+ MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
|
|
+ Assert.assertEquals(0, ClusterMetrics.getMetrics().getUnhealthyNMs());
|
|
|
+ // node healthy
|
|
|
+ nm1.nodeHeartbeat(true);
|
|
|
+ int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
|
|
|
+
|
|
|
+ // node unhealthy
|
|
|
+ nm1.nodeHeartbeat(false);
|
|
|
+ checkUnealthyNMCount(rm, nm1, true, 1);
|
|
|
+ UnRegisterNodeManagerRequest request = Records
|
|
|
+ .newRecord(UnRegisterNodeManagerRequest.class);
|
|
|
+ request.setNodeId(nm1.getNodeId());
|
|
|
+ resourceTrackerService.unRegisterNodeManager(request);
|
|
|
+ checkShutdownNMCount(rm, ++shutdownNMsCount);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testInvalidNMUnregistration() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ rm = new MockRM(conf);
|
|
|
+ rm.start();
|
|
|
+ ResourceTrackerService resourceTrackerService = rm
|
|
|
+ .getResourceTrackerService();
|
|
|
+ int shutdownNMsCount = ClusterMetrics.getMetrics()
|
|
|
+ .getNumShutdownNMs();
|
|
|
+ int decommisionedNMsCount = ClusterMetrics.getMetrics()
|
|
|
+ .getNumDecommisionedNMs();
|
|
|
+
|
|
|
+ // Node not found for unregister
|
|
|
+ UnRegisterNodeManagerRequest request = Records
|
|
|
+ .newRecord(UnRegisterNodeManagerRequest.class);
|
|
|
+ request.setNodeId(BuilderUtils.newNodeId("host", 1234));
|
|
|
+ resourceTrackerService.unRegisterNodeManager(request);
|
|
|
+ checkShutdownNMCount(rm, 0);
|
|
|
+ checkDecommissionedNMCount(rm, 0);
|
|
|
+
|
|
|
+ // 1. Register the Node Manager
|
|
|
+ // 2. Exclude the same Node Manager host
|
|
|
+ // 3. Give NM heartbeat to RM
|
|
|
+ // 4. Unregister the Node Manager
|
|
|
+ MockNM nm1 = new MockNM("host1:1234", 5120, resourceTrackerService);
|
|
|
+ RegisterNodeManagerResponse response = nm1.registerNode();
|
|
|
+ Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction());
|
|
|
+ writeToHostsFile("host2");
|
|
|
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
|
|
+ hostFile.getAbsolutePath());
|
|
|
+ rm.getNodesListManager().refreshNodes(conf);
|
|
|
+ NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
|
|
|
+ Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction());
|
|
|
+ checkShutdownNMCount(rm, shutdownNMsCount);
|
|
|
+ checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
|
|
|
+ request.setNodeId(nm1.getNodeId());
|
|
|
+ resourceTrackerService.unRegisterNodeManager(request);
|
|
|
+ checkShutdownNMCount(rm, shutdownNMsCount);
|
|
|
+ checkDecommissionedNMCount(rm, decommisionedNMsCount);
|
|
|
+
|
|
|
+ // 1. Register the Node Manager
|
|
|
+ // 2. Exclude the same Node Manager host
|
|
|
+ // 3. Unregister the Node Manager
|
|
|
+ MockNM nm2 = new MockNM("host2:1234", 5120, resourceTrackerService);
|
|
|
+ RegisterNodeManagerResponse response2 = nm2.registerNode();
|
|
|
+ Assert.assertEquals(NodeAction.NORMAL, response2.getNodeAction());
|
|
|
+ writeToHostsFile("host1");
|
|
|
+ conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
|
|
+ hostFile.getAbsolutePath());
|
|
|
+ rm.getNodesListManager().refreshNodes(conf);
|
|
|
+ request.setNodeId(nm2.getNodeId());
|
|
|
+ resourceTrackerService.unRegisterNodeManager(request);
|
|
|
+ checkShutdownNMCount(rm, shutdownNMsCount);
|
|
|
+ checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
|
|
|
+ }
|
|
|
+
|
|
|
private void writeToHostsFile(String... hosts) throws IOException {
|
|
|
if (!hostFile.exists()) {
|
|
|
TEMP_DIR.mkdirs();
|
|
@@ -1110,6 +1218,19 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
|
|
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
|
|
}
|
|
|
|
|
|
+ private void checkShutdownNMCount(MockRM rm, int count)
|
|
|
+ throws InterruptedException {
|
|
|
+ int waitCount = 0;
|
|
|
+ while (ClusterMetrics.getMetrics().getNumShutdownNMs() != count
|
|
|
+ && waitCount++ < 20) {
|
|
|
+ synchronized (this) {
|
|
|
+ wait(100);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Assert.assertEquals("The shutdown metrics are not updated", count,
|
|
|
+ ClusterMetrics.getMetrics().getNumShutdownNMs());
|
|
|
+ }
|
|
|
+
|
|
|
@After
|
|
|
public void tearDown() {
|
|
|
if (hostFile != null && hostFile.exists()) {
|