|
@@ -31,8 +31,6 @@ import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Set;
|
|
|
-import java.util.concurrent.CountDownLatch;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
@@ -50,6 +48,8 @@ import org.apache.hadoop.yarn.api.records.NodeState;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
|
import org.apache.hadoop.yarn.event.Event;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
|
|
@@ -141,12 +141,12 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
|
|
|
|
|
rm.getNodesListManager().refreshNodes(conf);
|
|
|
|
|
|
- checkShutdownNMCount(rm, ++metricCount);
|
|
|
+ checkDecommissionedNMCount(rm, ++metricCount);
|
|
|
|
|
|
nodeHeartbeat = nm1.nodeHeartbeat(true);
|
|
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
|
|
Assert
|
|
|
- .assertEquals(1, ClusterMetrics.getMetrics().getNumShutdownNMs());
|
|
|
+ .assertEquals(1, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
|
|
|
|
|
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
|
|
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
|
|
@@ -155,8 +155,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
|
|
nodeHeartbeat = nm3.nodeHeartbeat(true);
|
|
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
|
|
Assert.assertEquals(metricCount, ClusterMetrics.getMetrics()
|
|
|
- .getNumShutdownNMs());
|
|
|
- rm.stop();
|
|
|
+ .getNumDecommisionedNMs());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -227,7 +226,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
|
|
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
|
|
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
|
|
assert(metrics != null);
|
|
|
- int initialMetricCount = metrics.getNumShutdownNMs();
|
|
|
+ int initialMetricCount = metrics.getNumDecommisionedNMs();
|
|
|
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
|
|
Assert.assertEquals(
|
|
|
NodeAction.NORMAL,
|
|
@@ -240,16 +239,16 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
|
|
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
|
|
|
.getAbsolutePath());
|
|
|
rm.getNodesListManager().refreshNodes(conf);
|
|
|
- checkShutdownNMCount(rm, ++initialMetricCount);
|
|
|
+ checkDecommissionedNMCount(rm, ++initialMetricCount);
|
|
|
nodeHeartbeat = nm1.nodeHeartbeat(true);
|
|
|
Assert.assertEquals(
|
|
|
- "Node should not have been shutdown.",
|
|
|
+ "Node should not have been decomissioned.",
|
|
|
NodeAction.NORMAL,
|
|
|
nodeHeartbeat.getNodeAction());
|
|
|
- NodeState nodeState =
|
|
|
- rm.getRMContext().getInactiveRMNodes().get(nm2.getNodeId()).getState();
|
|
|
- Assert.assertEquals("Node should have been shutdown but is in state" +
|
|
|
- nodeState, NodeState.SHUTDOWN, nodeState);
|
|
|
+ nodeHeartbeat = nm2.nodeHeartbeat(true);
|
|
|
+ Assert.assertEquals("Node should have been decomissioned but is in state" +
|
|
|
+ nodeHeartbeat.getNodeAction(),
|
|
|
+ NodeAction.SHUTDOWN, nodeHeartbeat.getNodeAction());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1118,6 +1117,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
|
|
rm.start();
|
|
|
ResourceTrackerService resourceTrackerService = rm
|
|
|
.getResourceTrackerService();
|
|
|
+ int shutdownNMsCount = ClusterMetrics.getMetrics()
|
|
|
+ .getNumShutdownNMs();
|
|
|
int decommisionedNMsCount = ClusterMetrics.getMetrics()
|
|
|
.getNumDecommisionedNMs();
|
|
|
|
|
@@ -1142,12 +1143,10 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
|
|
rm.getNodesListManager().refreshNodes(conf);
|
|
|
NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
|
|
|
Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction());
|
|
|
- int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
|
|
|
checkShutdownNMCount(rm, shutdownNMsCount);
|
|
|
- checkDecommissionedNMCount(rm, decommisionedNMsCount);
|
|
|
+ checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
|
|
|
request.setNodeId(nm1.getNodeId());
|
|
|
resourceTrackerService.unRegisterNodeManager(request);
|
|
|
- shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
|
|
|
checkShutdownNMCount(rm, shutdownNMsCount);
|
|
|
checkDecommissionedNMCount(rm, decommisionedNMsCount);
|
|
|
|
|
@@ -1163,9 +1162,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
|
|
rm.getNodesListManager().refreshNodes(conf);
|
|
|
request.setNodeId(nm2.getNodeId());
|
|
|
resourceTrackerService.unRegisterNodeManager(request);
|
|
|
- checkShutdownNMCount(rm, ++shutdownNMsCount);
|
|
|
- checkDecommissionedNMCount(rm, decommisionedNMsCount);
|
|
|
- rm.stop();
|
|
|
+ checkShutdownNMCount(rm, shutdownNMsCount);
|
|
|
+ checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
|
|
|
}
|
|
|
|
|
|
@Test(timeout = 30000)
|
|
@@ -1300,186 +1298,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
|
|
rm.stop();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Remove a node from all lists and check if its forgotten
|
|
|
- */
|
|
|
- @Test
|
|
|
- public void testNodeRemovalNormally() throws Exception {
|
|
|
- testNodeRemovalUtil(false);
|
|
|
- }
|
|
|
-
|
|
|
- @Test
|
|
|
- public void testNodeRemovalGracefully() throws Exception {
|
|
|
- testNodeRemovalUtil(true);
|
|
|
- }
|
|
|
-
|
|
|
- public void refreshNodesOption(boolean doGraceful, Configuration conf)
|
|
|
- throws Exception {
|
|
|
- if (doGraceful) {
|
|
|
- rm.getNodesListManager().refreshNodesGracefully(conf);
|
|
|
- } else {
|
|
|
- rm.getNodesListManager().refreshNodes(conf);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
|
|
|
- Configuration conf = new Configuration();
|
|
|
- int timeoutValue = 500;
|
|
|
- File excludeHostFile = new File(TEMP_DIR + File.separator +
|
|
|
- "excludeHostFile.txt");
|
|
|
- conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, "");
|
|
|
- conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, "");
|
|
|
- conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
|
|
|
- timeoutValue);
|
|
|
- CountDownLatch latch = new CountDownLatch(1);
|
|
|
- rm = new MockRM(conf);
|
|
|
- rm.init(conf);
|
|
|
- rm.start();
|
|
|
- RMContext rmContext = rm.getRMContext();
|
|
|
- refreshNodesOption(doGraceful, conf);
|
|
|
- MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
|
|
- MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
|
|
- MockNM nm3 = rm.registerNode("localhost:4433", 1024);
|
|
|
- ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
|
|
- assert (metrics != null);
|
|
|
-
|
|
|
- //check all 3 nodes joined in as NORMAL
|
|
|
- NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
|
|
- Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
|
|
- nodeHeartbeat = nm2.nodeHeartbeat(true);
|
|
|
- Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
|
|
- nodeHeartbeat = nm3.nodeHeartbeat(true);
|
|
|
- Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
|
|
- rm.drainEvents();
|
|
|
- Assert.assertEquals("All 3 nodes should be active",
|
|
|
- metrics.getNumActiveNMs(), 3);
|
|
|
-
|
|
|
- //Remove nm2 from include list, should now be shutdown with timer test
|
|
|
- String ip = NetUtils.normalizeHostName("localhost");
|
|
|
- writeToHostsFile("host1", ip);
|
|
|
- conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
|
|
|
- .getAbsolutePath());
|
|
|
- refreshNodesOption(doGraceful, conf);
|
|
|
- nm1.nodeHeartbeat(true);
|
|
|
- rm.drainEvents();
|
|
|
- Assert.assertTrue("Node should not be in active node list",
|
|
|
- !rmContext.getRMNodes().containsKey(nm2.getNodeId()));
|
|
|
-
|
|
|
- RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
|
|
|
- Assert.assertEquals("Node should be in inactive node list",
|
|
|
- rmNode.getState(), NodeState.SHUTDOWN);
|
|
|
- Assert.assertEquals("Active nodes should be 2",
|
|
|
- metrics.getNumActiveNMs(), 2);
|
|
|
- Assert.assertEquals("Shutdown nodes should be 1",
|
|
|
- metrics.getNumShutdownNMs(), 1);
|
|
|
-
|
|
|
- int nodeRemovalTimeout =
|
|
|
- conf.getInt(
|
|
|
- YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
|
|
|
- YarnConfiguration.
|
|
|
- DEFAULT_RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC);
|
|
|
- int nodeRemovalInterval =
|
|
|
- rmContext.getNodesListManager().getNodeRemovalCheckInterval();
|
|
|
- long maxThreadSleeptime = nodeRemovalInterval + nodeRemovalTimeout;
|
|
|
- latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
|
|
|
-
|
|
|
- rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
|
|
|
- Assert.assertEquals("Node should have been forgotten!",
|
|
|
- rmNode, null);
|
|
|
- Assert.assertEquals("Shutdown nodes should be 0 now",
|
|
|
- metrics.getNumShutdownNMs(), 0);
|
|
|
-
|
|
|
- //Check node removal and re-addition before timer expires
|
|
|
- writeToHostsFile("host1", ip, "host2");
|
|
|
- refreshNodesOption(doGraceful, conf);
|
|
|
- nm2 = rm.registerNode("host2:5678", 10240);
|
|
|
- rm.drainEvents();
|
|
|
- writeToHostsFile("host1", ip);
|
|
|
- refreshNodesOption(doGraceful, conf);
|
|
|
- rm.drainEvents();
|
|
|
- rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
|
|
|
- Assert.assertEquals("Node should be shutdown",
|
|
|
- rmNode.getState(), NodeState.SHUTDOWN);
|
|
|
- Assert.assertEquals("Active nodes should be 2",
|
|
|
- metrics.getNumActiveNMs(), 2);
|
|
|
- Assert.assertEquals("Shutdown nodes should be 1",
|
|
|
- metrics.getNumShutdownNMs(), 1);
|
|
|
-
|
|
|
- //add back the node before timer expires
|
|
|
- latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS);
|
|
|
- writeToHostsFile("host1", ip, "host2");
|
|
|
- refreshNodesOption(doGraceful, conf);
|
|
|
- nm2 = rm.registerNode("host2:5678", 10240);
|
|
|
- nodeHeartbeat = nm2.nodeHeartbeat(true);
|
|
|
- rm.drainEvents();
|
|
|
- Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
|
|
- Assert.assertEquals("Shutdown nodes should be 0 now",
|
|
|
- metrics.getNumShutdownNMs(), 0);
|
|
|
- Assert.assertEquals("All 3 nodes should be active",
|
|
|
- metrics.getNumActiveNMs(), 3);
|
|
|
-
|
|
|
- //Decommission this node, check timer doesn't remove it
|
|
|
- writeToHostsFile("host1", "host2", ip);
|
|
|
- writeToHostsFile(excludeHostFile, "host2");
|
|
|
- conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, excludeHostFile
|
|
|
- .getAbsolutePath());
|
|
|
- refreshNodesOption(doGraceful, conf);
|
|
|
- rm.drainEvents();
|
|
|
- rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
|
|
|
- rmContext.getInactiveRMNodes().get(nm2.getNodeId());
|
|
|
- Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
|
|
|
- (rmNode.getState() == NodeState.DECOMMISSIONED) ||
|
|
|
- (rmNode.getState() == NodeState.DECOMMISSIONING));
|
|
|
- if (rmNode.getState() == NodeState.DECOMMISSIONED) {
|
|
|
- Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
|
|
|
- metrics.getNumDecommisionedNMs(), 1);
|
|
|
- }
|
|
|
- latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
|
|
|
-
|
|
|
- rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
|
|
|
- rmContext.getInactiveRMNodes().get(nm2.getNodeId());
|
|
|
- Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
|
|
|
- (rmNode.getState() == NodeState.DECOMMISSIONED) ||
|
|
|
- (rmNode.getState() == NodeState.DECOMMISSIONING));
|
|
|
- if (rmNode.getState() == NodeState.DECOMMISSIONED) {
|
|
|
- Assert.assertEquals("Decommissioned/ing nodes should be 1 now",
|
|
|
- metrics.getNumDecommisionedNMs(), 1);
|
|
|
- }
|
|
|
-
|
|
|
- //Test decommed/ing node that transitions to untracked,timer should remove
|
|
|
- writeToHostsFile("host1", ip, "host2");
|
|
|
- writeToHostsFile(excludeHostFile, "host2");
|
|
|
- refreshNodesOption(doGraceful, conf);
|
|
|
- nm1.nodeHeartbeat(true);
|
|
|
- //nm2.nodeHeartbeat(true);
|
|
|
- nm3.nodeHeartbeat(true);
|
|
|
- latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
|
|
|
- rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
|
|
|
- rmContext.getInactiveRMNodes().get(nm2.getNodeId());
|
|
|
- Assert.assertNotEquals("Timer for this node was not canceled!",
|
|
|
- rmNode, null);
|
|
|
- Assert.assertTrue("Node should be DECOMMISSIONED or DECOMMISSIONING",
|
|
|
- (rmNode.getState() == NodeState.DECOMMISSIONED) ||
|
|
|
- (rmNode.getState() == NodeState.DECOMMISSIONING));
|
|
|
-
|
|
|
- writeToHostsFile("host1", ip);
|
|
|
- writeToHostsFile(excludeHostFile, "");
|
|
|
- refreshNodesOption(doGraceful, conf);
|
|
|
- latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
|
|
|
- rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
|
|
|
- rmContext.getInactiveRMNodes().get(nm2.getNodeId());
|
|
|
- Assert.assertEquals("Node should have been forgotten!",
|
|
|
- rmNode, null);
|
|
|
- Assert.assertEquals("Shutdown nodes should be 0 now",
|
|
|
- metrics.getNumDecommisionedNMs(), 0);
|
|
|
- Assert.assertEquals("Shutdown nodes should be 0 now",
|
|
|
- metrics.getNumShutdownNMs(), 0);
|
|
|
- Assert.assertEquals("Active nodes should be 2",
|
|
|
- metrics.getNumActiveNMs(), 2);
|
|
|
-
|
|
|
- rm.stop();
|
|
|
- }
|
|
|
-
|
|
|
private void writeToHostsFile(String... hosts) throws IOException {
|
|
|
writeToHostsFile(hostFile, hosts);
|
|
|
}
|