|
@@ -63,11 +63,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
|
@@ -92,6 +96,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
|
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
|
@@ -116,6 +121,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
|
|
@@ -144,6 +150,7 @@ import org.junit.Assert;
|
|
|
import org.junit.BeforeClass;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
+import com.google.common.collect.ImmutableSet;
|
|
|
import com.google.common.collect.Sets;
|
|
|
|
|
|
public class TestClientRMService {
|
|
@@ -189,13 +196,18 @@ public class TestClientRMService {
|
|
|
};
|
|
|
};
|
|
|
rm.start();
|
|
|
+ RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager();
|
|
|
+ labelsMgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
|
|
|
|
|
|
- // Add a healthy node
|
|
|
+ // Add a healthy node with label = x
|
|
|
MockNM node = rm.registerNode("host1:1234", 1024);
|
|
|
+ Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>();
|
|
|
+ map.put(node.getNodeId(), ImmutableSet.of("x"));
|
|
|
+ labelsMgr.replaceLabelsOnNode(map);
|
|
|
rm.sendNodeStarted(node);
|
|
|
node.nodeHeartbeat(true);
|
|
|
|
|
|
- // Add and lose a node
|
|
|
+ // Add and lose a node with label = y
|
|
|
MockNM lostNode = rm.registerNode("host2:1235", 1024);
|
|
|
rm.sendNodeStarted(lostNode);
|
|
|
lostNode.nodeHeartbeat(true);
|
|
@@ -219,6 +231,9 @@ public class TestClientRMService {
|
|
|
Assert.assertEquals(1, nodeReports.size());
|
|
|
Assert.assertNotSame("Node is expected to be healthy!", NodeState.UNHEALTHY,
|
|
|
nodeReports.get(0).getNodeState());
|
|
|
+
|
|
|
+ // Check node's label = x
|
|
|
+ Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x"));
|
|
|
|
|
|
// Now make the node unhealthy.
|
|
|
node.nodeHeartbeat(false);
|
|
@@ -228,6 +243,11 @@ public class TestClientRMService {
|
|
|
Assert.assertEquals("Unhealthy nodes should not show up by default", 0,
|
|
|
nodeReports.size());
|
|
|
|
|
|
+ // Change label of host1 to y
|
|
|
+ map = new HashMap<NodeId, Set<String>>();
|
|
|
+ map.put(node.getNodeId(), ImmutableSet.of("y"));
|
|
|
+ labelsMgr.replaceLabelsOnNode(map);
|
|
|
+
|
|
|
// Now query for UNHEALTHY nodes
|
|
|
request = GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.UNHEALTHY));
|
|
|
nodeReports = client.getClusterNodes(request).getNodeReports();
|
|
@@ -235,11 +255,27 @@ public class TestClientRMService {
|
|
|
Assert.assertEquals("Node is expected to be unhealthy!", NodeState.UNHEALTHY,
|
|
|
nodeReports.get(0).getNodeState());
|
|
|
|
|
|
+ Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y"));
|
|
|
+
|
|
|
+ // Remove labels of host1
|
|
|
+ map = new HashMap<NodeId, Set<String>>();
|
|
|
+ map.put(node.getNodeId(), ImmutableSet.of("y"));
|
|
|
+ labelsMgr.removeLabelsFromNode(map);
|
|
|
+
|
|
|
// Query all states should return all nodes
|
|
|
rm.registerNode("host3:1236", 1024);
|
|
|
request = GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class));
|
|
|
nodeReports = client.getClusterNodes(request).getNodeReports();
|
|
|
Assert.assertEquals(3, nodeReports.size());
|
|
|
+
|
|
|
+ // All host1-3's label should be empty (instead of null)
|
|
|
+ for (NodeReport report : nodeReports) {
|
|
|
+ Assert.assertTrue(report.getNodeLabels() != null
|
|
|
+ && report.getNodeLabels().isEmpty());
|
|
|
+ }
|
|
|
+
|
|
|
+ rpc.stopProxy(client, conf);
|
|
|
+ rm.close();
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -1321,4 +1357,54 @@ public class TestClientRMService {
|
|
|
ReservationSystemTestUtil.reservationQ);
|
|
|
return request;
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testGetNodeLabels() throws Exception {
|
|
|
+ MockRM rm = new MockRM() {
|
|
|
+ protected ClientRMService createClientRMService() {
|
|
|
+ return new ClientRMService(this.rmContext, scheduler,
|
|
|
+ this.rmAppManager, this.applicationACLsManager,
|
|
|
+ this.queueACLsManager, this.getRMContext()
|
|
|
+ .getRMDelegationTokenSecretManager());
|
|
|
+ };
|
|
|
+ };
|
|
|
+ rm.start();
|
|
|
+ RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager();
|
|
|
+ labelsMgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
|
|
|
+
|
|
|
+ Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>();
|
|
|
+ map.put(NodeId.newInstance("host1", 0), ImmutableSet.of("x"));
|
|
|
+ map.put(NodeId.newInstance("host2", 0), ImmutableSet.of("y"));
|
|
|
+ labelsMgr.replaceLabelsOnNode(map);
|
|
|
+
|
|
|
+ // Create a client.
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ YarnRPC rpc = YarnRPC.create(conf);
|
|
|
+ InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
|
|
|
+ LOG.info("Connecting to ResourceManager at " + rmAddress);
|
|
|
+ ApplicationClientProtocol client =
|
|
|
+ (ApplicationClientProtocol) rpc.getProxy(
|
|
|
+ ApplicationClientProtocol.class, rmAddress, conf);
|
|
|
+
|
|
|
+ // Get node labels collection
|
|
|
+ GetClusterNodeLabelsResponse response =
|
|
|
+ client.getClusterNodeLabels(GetClusterNodeLabelsRequest.newInstance());
|
|
|
+ Assert.assertTrue(response.getNodeLabels().containsAll(
|
|
|
+ Arrays.asList("x", "y")));
|
|
|
+
|
|
|
+ // Get node labels mapping
|
|
|
+ GetNodesToLabelsResponse response1 =
|
|
|
+ client.getNodeToLabels(GetNodesToLabelsRequest.newInstance());
|
|
|
+ Map<NodeId, Set<String>> nodeToLabels = response1.getNodeToLabels();
|
|
|
+ Assert.assertTrue(nodeToLabels.keySet().containsAll(
|
|
|
+ Arrays.asList(NodeId.newInstance("host1", 0),
|
|
|
+ NodeId.newInstance("host2", 0))));
|
|
|
+ Assert.assertTrue(nodeToLabels.get(NodeId.newInstance("host1", 0))
|
|
|
+ .containsAll(Arrays.asList("x")));
|
|
|
+ Assert.assertTrue(nodeToLabels.get(NodeId.newInstance("host2", 0))
|
|
|
+ .containsAll(Arrays.asList("y")));
|
|
|
+
|
|
|
+ rpc.stopProxy(client, conf);
|
|
|
+ rm.close();
|
|
|
+ }
|
|
|
}
|