|
@@ -24,12 +24,15 @@ import java.util.List;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
|
+import org.apache.hadoop.net.NetworkTopology;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
+import org.apache.hadoop.yarn.client.api.AMRMClient;
|
|
|
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
|
|
+import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
|
|
|
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
|
|
|
import org.junit.Test;
|
|
|
|
|
@@ -52,11 +55,159 @@ public class TestAMRMClientContainerRequest {
|
|
|
new ContainerRequest(capability, new String[] {"host1", "host2"},
|
|
|
new String[] {"/rack2"}, Priority.newInstance(1), 4);
|
|
|
client.addContainerRequest(request);
|
|
|
- verifyResourceRequestLocation(client, request, "host1");
|
|
|
- verifyResourceRequestLocation(client, request, "host2");
|
|
|
- verifyResourceRequestLocation(client, request, "/rack1");
|
|
|
- verifyResourceRequestLocation(client, request, "/rack2");
|
|
|
- verifyResourceRequestLocation(client, request, ResourceRequest.ANY);
|
|
|
+ verifyResourceRequest(client, request, "host1", true);
|
|
|
+ verifyResourceRequest(client, request, "host2", true);
|
|
|
+ verifyResourceRequest(client, request, "/rack1", true);
|
|
|
+ verifyResourceRequest(client, request, "/rack2", true);
|
|
|
+ verifyResourceRequest(client, request, ResourceRequest.ANY, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testDisableLocalityRelaxation() {
|
|
|
+ AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
|
|
|
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setClass(
|
|
|
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
|
|
+ MyResolver.class, DNSToSwitchMapping.class);
|
|
|
+ client.init(conf);
|
|
|
+
|
|
|
+ Resource capability = Resource.newInstance(1024, 1);
|
|
|
+ ContainerRequest nodeLevelRequest =
|
|
|
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
|
|
|
+ null, Priority.newInstance(1), 4, false);
|
|
|
+ client.addContainerRequest(nodeLevelRequest);
|
|
|
+
|
|
|
+ verifyResourceRequest(client, nodeLevelRequest, ResourceRequest.ANY, false);
|
|
|
+ verifyResourceRequest(client, nodeLevelRequest, "/rack1", false);
|
|
|
+ verifyResourceRequest(client, nodeLevelRequest, "host1", true);
|
|
|
+ verifyResourceRequest(client, nodeLevelRequest, "host2", true);
|
|
|
+
|
|
|
+ // Make sure we don't get any errors with two node-level requests at the
|
|
|
+ // same priority
|
|
|
+ ContainerRequest nodeLevelRequest2 =
|
|
|
+ new ContainerRequest(capability, new String[] {"host2", "host3"},
|
|
|
+ null, Priority.newInstance(1), 4, false);
|
|
|
+ client.addContainerRequest(nodeLevelRequest2);
|
|
|
+
|
|
|
+ AMRMClient.ContainerRequest rackLevelRequest =
|
|
|
+ new AMRMClient.ContainerRequest(capability, null,
|
|
|
+ new String[] {"/rack3", "/rack4"}, Priority.newInstance(2), 3, false);
|
|
|
+ client.addContainerRequest(rackLevelRequest);
|
|
|
+
|
|
|
+ verifyResourceRequest(client, rackLevelRequest, ResourceRequest.ANY, false);
|
|
|
+ verifyResourceRequest(client, rackLevelRequest, "/rack3", true);
|
|
|
+ verifyResourceRequest(client, rackLevelRequest, "/rack4", true);
|
|
|
+
|
|
|
+ // Make sure we don't get any errors with two rack-level requests at the
|
|
|
+ // same priority
|
|
|
+ AMRMClient.ContainerRequest rackLevelRequest2 =
|
|
|
+ new AMRMClient.ContainerRequest(capability, null,
|
|
|
+ new String[] {"/rack4", "/rack5"}, Priority.newInstance(2), 3, false);
|
|
|
+ client.addContainerRequest(rackLevelRequest2);
|
|
|
+
|
|
|
+ ContainerRequest bothLevelRequest =
|
|
|
+ new ContainerRequest(capability, new String[] {"host3", "host4"},
|
|
|
+ new String[] {"rack1", "/otherrack"},
|
|
|
+ Priority.newInstance(3), 4, false);
|
|
|
+ client.addContainerRequest(bothLevelRequest);
|
|
|
+
|
|
|
+ verifyResourceRequest(client, bothLevelRequest, ResourceRequest.ANY, false);
|
|
|
+ verifyResourceRequest(client, bothLevelRequest, "rack1",
|
|
|
+ true);
|
|
|
+ verifyResourceRequest(client, bothLevelRequest, "/otherrack",
|
|
|
+ true);
|
|
|
+ verifyResourceRequest(client, bothLevelRequest, "host3", true);
|
|
|
+ verifyResourceRequest(client, bothLevelRequest, "host4", true);
|
|
|
+
|
|
|
+ // Make sure we don't get any errors with two both-level requests at the
|
|
|
+ // same priority
|
|
|
+ ContainerRequest bothLevelRequest2 =
|
|
|
+ new ContainerRequest(capability, new String[] {"host4", "host5"},
|
|
|
+ new String[] {"rack1", "/otherrack2"},
|
|
|
+ Priority.newInstance(3), 4, false);
|
|
|
+ client.addContainerRequest(bothLevelRequest2);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (expected = InvalidContainerRequestException.class)
|
|
|
+ public void testDifferentLocalityRelaxationSamePriority() {
|
|
|
+ AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
|
|
|
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setClass(
|
|
|
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
|
|
+ MyResolver.class, DNSToSwitchMapping.class);
|
|
|
+ client.init(conf);
|
|
|
+
|
|
|
+ Resource capability = Resource.newInstance(1024, 1);
|
|
|
+ ContainerRequest request1 =
|
|
|
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
|
|
|
+ null, Priority.newInstance(1), 4, false);
|
|
|
+ client.addContainerRequest(request1);
|
|
|
+ ContainerRequest request2 =
|
|
|
+ new ContainerRequest(capability, new String[] {"host3"},
|
|
|
+ null, Priority.newInstance(1), 4, true);
|
|
|
+ client.addContainerRequest(request2);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testInvalidValidWhenOldRemoved() {
|
|
|
+ AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
|
|
|
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setClass(
|
|
|
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
|
|
+ MyResolver.class, DNSToSwitchMapping.class);
|
|
|
+ client.init(conf);
|
|
|
+
|
|
|
+ Resource capability = Resource.newInstance(1024, 1);
|
|
|
+ ContainerRequest request1 =
|
|
|
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
|
|
|
+ null, Priority.newInstance(1), 4, false);
|
|
|
+ client.addContainerRequest(request1);
|
|
|
+
|
|
|
+ client.removeContainerRequest(request1);
|
|
|
+
|
|
|
+ ContainerRequest request2 =
|
|
|
+ new ContainerRequest(capability, new String[] {"host3"},
|
|
|
+ null, Priority.newInstance(1), 4, true);
|
|
|
+ client.addContainerRequest(request2);
|
|
|
+
|
|
|
+ client.removeContainerRequest(request2);
|
|
|
+
|
|
|
+ ContainerRequest request3 =
|
|
|
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
|
|
|
+ null, Priority.newInstance(1), 4, false);
|
|
|
+ client.addContainerRequest(request3);
|
|
|
+
|
|
|
+ client.removeContainerRequest(request3);
|
|
|
+
|
|
|
+ ContainerRequest request4 =
|
|
|
+ new ContainerRequest(capability, null,
|
|
|
+ new String[] {"rack1"}, Priority.newInstance(1), 4, true);
|
|
|
+ client.addContainerRequest(request4);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (expected = InvalidContainerRequestException.class)
|
|
|
+ public void testLocalityRelaxationDifferentLevels() {
|
|
|
+ AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
|
|
|
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setClass(
|
|
|
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
|
|
|
+ MyResolver.class, DNSToSwitchMapping.class);
|
|
|
+ client.init(conf);
|
|
|
+
|
|
|
+ Resource capability = Resource.newInstance(1024, 1);
|
|
|
+ ContainerRequest request1 =
|
|
|
+ new ContainerRequest(capability, new String[] {"host1", "host2"},
|
|
|
+ null, Priority.newInstance(1), 4, false);
|
|
|
+ client.addContainerRequest(request1);
|
|
|
+ ContainerRequest request2 =
|
|
|
+ new ContainerRequest(capability, null,
|
|
|
+ new String[] {"rack1"}, Priority.newInstance(1), 4, true);
|
|
|
+ client.addContainerRequest(request2);
|
|
|
}
|
|
|
|
|
|
private static class MyResolver implements DNSToSwitchMapping {
|
|
@@ -70,12 +221,13 @@ public class TestAMRMClientContainerRequest {
|
|
|
public void reloadCachedMappings() {}
|
|
|
}
|
|
|
|
|
|
- private void verifyResourceRequestLocation(
|
|
|
+ private void verifyResourceRequest(
|
|
|
AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
|
|
|
- String location) {
|
|
|
+ String location, boolean expectedRelaxLocality) {
|
|
|
ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority())
|
|
|
.get(location).get(request.getCapability()).remoteRequest;
|
|
|
assertEquals(location, ask.getResourceName());
|
|
|
assertEquals(request.getContainerCount(), ask.getNumContainers());
|
|
|
+ assertEquals(expectedRelaxLocality, ask.getRelaxLocality());
|
|
|
}
|
|
|
}
|