|
@@ -18,6 +18,13 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
|
|
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
|
|
|
+import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import static org.mockito.Matchers.any;
|
|
@@ -1999,6 +2006,125 @@ public class TestClientRMService {
|
|
|
rm.close();
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout = 120000)
|
|
|
+ public void testGetClusterNodeAttributes() throws IOException, YarnException {
|
|
|
+ MockRM rm = new MockRM() {
|
|
|
+ protected ClientRMService createClientRMService() {
|
|
|
+ return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
|
|
|
+ this.applicationACLsManager, this.queueACLsManager,
|
|
|
+ this.getRMContext().getRMDelegationTokenSecretManager());
|
|
|
+ }
|
|
|
+ };
|
|
|
+ rm.start();
|
|
|
+
|
|
|
+ NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager();
|
|
|
+ NodeId host1 = NodeId.newInstance("host1", 0);
|
|
|
+ NodeId host2 = NodeId.newInstance("host2", 0);
|
|
|
+ NodeAttribute gpu = NodeAttribute
|
|
|
+ .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
|
|
|
+ NodeAttributeType.STRING, "nvida");
|
|
|
+ NodeAttribute os = NodeAttribute
|
|
|
+ .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS",
|
|
|
+ NodeAttributeType.STRING, "windows64");
|
|
|
+ NodeAttribute docker = NodeAttribute
|
|
|
+ .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
|
|
|
+ NodeAttributeType.STRING, "docker0");
|
|
|
+ Map<String, Set<NodeAttribute>> nodes = new HashMap<>();
|
|
|
+ nodes.put(host1.getHost(), ImmutableSet.of(gpu, os));
|
|
|
+ nodes.put(host2.getHost(), ImmutableSet.of(docker));
|
|
|
+ mgr.addNodeAttributes(nodes);
|
|
|
+ // Create a client.
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ YarnRPC rpc = YarnRPC.create(conf);
|
|
|
+ InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
|
|
|
+ LOG.info("Connecting to ResourceManager at " + rmAddress);
|
|
|
+ ApplicationClientProtocol client = (ApplicationClientProtocol) rpc
|
|
|
+ .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
|
|
|
+
|
|
|
+ GetClusterNodeAttributesRequest request =
|
|
|
+ GetClusterNodeAttributesRequest.newInstance();
|
|
|
+ GetClusterNodeAttributesResponse response =
|
|
|
+ client.getClusterNodeAttributes(request);
|
|
|
+ Set<NodeAttribute> attributes = response.getNodeAttributes();
|
|
|
+ Assert.assertEquals("Size not correct", 3, attributes.size());
|
|
|
+ Assert.assertTrue(attributes.contains(gpu));
|
|
|
+ Assert.assertTrue(attributes.contains(os));
|
|
|
+ Assert.assertTrue(attributes.contains(docker));
|
|
|
+ rpc.stopProxy(client, conf);
|
|
|
+ rm.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 120000)
|
|
|
+ public void testGetAttributesToNodes() throws IOException, YarnException {
|
|
|
+ MockRM rm = new MockRM() {
|
|
|
+ protected ClientRMService createClientRMService() {
|
|
|
+ return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
|
|
|
+ this.applicationACLsManager, this.queueACLsManager,
|
|
|
+ this.getRMContext().getRMDelegationTokenSecretManager());
|
|
|
+ }
|
|
|
+ };
|
|
|
+ rm.start();
|
|
|
+
|
|
|
+ NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager();
|
|
|
+ String node1 = "host1";
|
|
|
+ String node2 = "host2";
|
|
|
+ NodeAttribute gpu = NodeAttribute
|
|
|
+ .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
|
|
|
+ NodeAttributeType.STRING, "nvida");
|
|
|
+ NodeAttribute os = NodeAttribute
|
|
|
+ .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS",
|
|
|
+ NodeAttributeType.STRING, "windows64");
|
|
|
+ NodeAttribute docker = NodeAttribute
|
|
|
+ .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
|
|
|
+ NodeAttributeType.STRING, "docker0");
|
|
|
+ NodeAttribute dist = NodeAttribute
|
|
|
+ .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION",
|
|
|
+ NodeAttributeType.STRING, "3_0_2");
|
|
|
+ Map<String, Set<NodeAttribute>> nodes = new HashMap<>();
|
|
|
+ nodes.put(node1, ImmutableSet.of(gpu, os, dist));
|
|
|
+ nodes.put(node2, ImmutableSet.of(docker, dist));
|
|
|
+ mgr.addNodeAttributes(nodes);
|
|
|
+ // Create a client.
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ YarnRPC rpc = YarnRPC.create(conf);
|
|
|
+ InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
|
|
|
+ LOG.info("Connecting to ResourceManager at " + rmAddress);
|
|
|
+ ApplicationClientProtocol client = (ApplicationClientProtocol) rpc
|
|
|
+ .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
|
|
|
+
|
|
|
+ GetAttributesToNodesRequest request =
|
|
|
+ GetAttributesToNodesRequest.newInstance();
|
|
|
+ GetAttributesToNodesResponse response =
|
|
|
+ client.getAttributesToNodes(request);
|
|
|
+ Map<NodeAttribute, Set<String>> attrs = response.getAttributesToNodes();
|
|
|
+ Assert.assertEquals(response.getAttributesToNodes().size(), 4);
|
|
|
+ Assert.assertEquals(attrs.get(dist).size(), 2);
|
|
|
+ Assert.assertEquals(attrs.get(os).size(), 1);
|
|
|
+ Assert.assertEquals(attrs.get(gpu).size(), 1);
|
|
|
+ Assert.assertTrue(attrs.get(dist).contains(node1));
|
|
|
+ Assert.assertTrue(attrs.get(dist).contains(node2));
|
|
|
+ Assert.assertTrue(attrs.get(docker).contains(node2));
|
|
|
+
|
|
|
+ GetAttributesToNodesRequest request2 =
|
|
|
+ GetAttributesToNodesRequest.newInstance(ImmutableSet.of(docker));
|
|
|
+ GetAttributesToNodesResponse response2 =
|
|
|
+ client.getAttributesToNodes(request2);
|
|
|
+ Map<NodeAttribute, Set<String>> attrs2 = response2.getAttributesToNodes();
|
|
|
+ Assert.assertEquals(response2.getAttributesToNodes().size(), 1);
|
|
|
+ Assert.assertTrue(attrs.get(docker).contains(node2));
|
|
|
+
|
|
|
+ GetAttributesToNodesRequest request3 =
|
|
|
+ GetAttributesToNodesRequest.newInstance(ImmutableSet.of(docker, os));
|
|
|
+ GetAttributesToNodesResponse response3 =
|
|
|
+ client.getAttributesToNodes(request3);
|
|
|
+ Map<NodeAttribute, Set<String>> attrs3 = response3.getAttributesToNodes();
|
|
|
+ Assert.assertEquals(response3.getAttributesToNodes().size(), 2);
|
|
|
+ Assert.assertTrue(attrs.get(os).contains(node1));
|
|
|
+ Assert.assertTrue(attrs.get(docker).contains(node2));
|
|
|
+ rpc.stopProxy(client, conf);
|
|
|
+ rm.close();
|
|
|
+ }
|
|
|
+
|
|
|
@Test(timeout = 120000)
|
|
|
public void testUpdatePriorityAndKillAppWithZeroClusterResource()
|
|
|
throws Exception {
|