|
@@ -22,6 +22,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
|
|
|
+
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeAttribute;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
|
|
|
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
|
|
@@ -2125,6 +2128,89 @@ public class TestClientRMService {
|
|
|
rm.close();
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout = 120000)
|
|
|
+ public void testGetNodesToAttributes() throws IOException, YarnException {
|
|
|
+ MockRM rm = new MockRM() {
|
|
|
+ protected ClientRMService createClientRMService() {
|
|
|
+ return new ClientRMService(this.rmContext, scheduler, this.rmAppManager,
|
|
|
+ this.applicationACLsManager, this.queueACLsManager,
|
|
|
+ this.getRMContext().getRMDelegationTokenSecretManager());
|
|
|
+ }
|
|
|
+ };
|
|
|
+ rm.start();
|
|
|
+
|
|
|
+ NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager();
|
|
|
+ String node1 = "host1";
|
|
|
+ String node2 = "host2";
|
|
|
+ NodeAttribute gpu = NodeAttribute
|
|
|
+ .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
|
|
|
+ NodeAttributeType.STRING, "nvida");
|
|
|
+ NodeAttribute os = NodeAttribute
|
|
|
+ .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS",
|
|
|
+ NodeAttributeType.STRING, "windows64");
|
|
|
+ NodeAttribute docker = NodeAttribute
|
|
|
+ .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
|
|
|
+ NodeAttributeType.STRING, "docker0");
|
|
|
+ NodeAttribute dist = NodeAttribute
|
|
|
+ .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION",
|
|
|
+ NodeAttributeType.STRING, "3_0_2");
|
|
|
+ Map<String, Set<NodeAttribute>> nodes = new HashMap<>();
|
|
|
+ nodes.put(node1, ImmutableSet.of(gpu, os, dist));
|
|
|
+ nodes.put(node2, ImmutableSet.of(docker, dist));
|
|
|
+ mgr.addNodeAttributes(nodes);
|
|
|
+ // Create a client.
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ YarnRPC rpc = YarnRPC.create(conf);
|
|
|
+ InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
|
|
|
+ LOG.info("Connecting to ResourceManager at " + rmAddress);
|
|
|
+ ApplicationClientProtocol client = (ApplicationClientProtocol) rpc
|
|
|
+ .getProxy(ApplicationClientProtocol.class, rmAddress, conf);
|
|
|
+
|
|
|
+ // Specify null for hostnames.
|
|
|
+ GetNodesToAttributesRequest request1 =
|
|
|
+ GetNodesToAttributesRequest.newInstance(null);
|
|
|
+ GetNodesToAttributesResponse response1 =
|
|
|
+ client.getNodesToAttributes(request1);
|
|
|
+ Map<String, Set<NodeAttribute>> hostToAttrs =
|
|
|
+ response1.getNodeToAttributes();
|
|
|
+ Assert.assertEquals(2, hostToAttrs.size());
|
|
|
+
|
|
|
+ Assert.assertTrue(hostToAttrs.get(node2).contains(dist));
|
|
|
+ Assert.assertTrue(hostToAttrs.get(node2).contains(docker));
|
|
|
+ Assert.assertTrue(hostToAttrs.get(node1).contains(dist));
|
|
|
+
|
|
|
+ // Specify particular node
|
|
|
+ GetNodesToAttributesRequest request2 =
|
|
|
+ GetNodesToAttributesRequest.newInstance(ImmutableSet.of(node1));
|
|
|
+ GetNodesToAttributesResponse response2 =
|
|
|
+ client.getNodesToAttributes(request2);
|
|
|
+ hostToAttrs = response2.getNodeToAttributes();
|
|
|
+ Assert.assertEquals(1, response2.getNodeToAttributes().size());
|
|
|
+ Assert.assertTrue(hostToAttrs.get(node1).contains(dist));
|
|
|
+
|
|
|
+ // Test queury with empty set
|
|
|
+ GetNodesToAttributesRequest request3 =
|
|
|
+ GetNodesToAttributesRequest.newInstance(Collections.emptySet());
|
|
|
+ GetNodesToAttributesResponse response3 =
|
|
|
+ client.getNodesToAttributes(request3);
|
|
|
+ hostToAttrs = response3.getNodeToAttributes();
|
|
|
+ Assert.assertEquals(2, hostToAttrs.size());
|
|
|
+
|
|
|
+ Assert.assertTrue(hostToAttrs.get(node2).contains(dist));
|
|
|
+ Assert.assertTrue(hostToAttrs.get(node2).contains(docker));
|
|
|
+ Assert.assertTrue(hostToAttrs.get(node1).contains(dist));
|
|
|
+
|
|
|
+ // test invalid hostname
|
|
|
+ GetNodesToAttributesRequest request4 =
|
|
|
+ GetNodesToAttributesRequest.newInstance(ImmutableSet.of("invalid"));
|
|
|
+ GetNodesToAttributesResponse response4 =
|
|
|
+ client.getNodesToAttributes(request4);
|
|
|
+ hostToAttrs = response4.getNodeToAttributes();
|
|
|
+ Assert.assertEquals(0, hostToAttrs.size());
|
|
|
+ rpc.stopProxy(client, conf);
|
|
|
+ rm.close();
|
|
|
+ }
|
|
|
+
|
|
|
@Test(timeout = 120000)
|
|
|
public void testUpdatePriorityAndKillAppWithZeroClusterResource()
|
|
|
throws Exception {
|