|
@@ -102,7 +102,7 @@ public class TestAMRMClient {
|
|
|
private List<NodeReport> nodeReports = null;
|
|
|
private ApplicationAttemptId attemptId = null;
|
|
|
private int nodeCount = 3;
|
|
|
-
|
|
|
+
|
|
|
static final int rolling_interval_sec = 13;
|
|
|
static final long am_expire_ms = 4000;
|
|
|
|
|
@@ -139,8 +139,8 @@ public class TestAMRMClient {
|
|
|
this.conf = conf;
|
|
|
conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName);
|
|
|
conf.setLong(
|
|
|
- YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
|
|
|
- rolling_interval_sec);
|
|
|
+ YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
|
|
|
+ rolling_interval_sec);
|
|
|
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms);
|
|
|
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
|
|
|
// set the minimum allocation so that resource decrease can go under 1024
|
|
@@ -161,7 +161,7 @@ public class TestAMRMClient {
|
|
|
|
|
|
// get node info
|
|
|
assertTrue("All node managers did not connect to the RM within the "
|
|
|
- + "allotted 5-second timeout",
|
|
|
+ + "allotted 5-second timeout",
|
|
|
yarnCluster.waitForNodeManagersToConnect(5000L));
|
|
|
nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
|
|
|
assertEquals("Not all node managers were reported running",
|
|
@@ -177,7 +177,7 @@ public class TestAMRMClient {
|
|
|
racks = new String[]{ rack };
|
|
|
|
|
|
// submit new app
|
|
|
- ApplicationSubmissionContext appContext =
|
|
|
+ ApplicationSubmissionContext appContext =
|
|
|
yarnClient.createApplication().getApplicationSubmissionContext();
|
|
|
ApplicationId appId = appContext.getApplicationId();
|
|
|
// set the application name
|
|
@@ -191,10 +191,10 @@ public class TestAMRMClient {
|
|
|
// Set up the container launch context for the application master
|
|
|
ContainerLaunchContext amContainer =
|
|
|
BuilderUtils.newContainerLaunchContext(
|
|
|
- Collections.<String, LocalResource> emptyMap(),
|
|
|
- new HashMap<String, String>(), Arrays.asList("sleep", "100"),
|
|
|
- new HashMap<String, ByteBuffer>(), null,
|
|
|
- new HashMap<ApplicationAccessType, String>());
|
|
|
+ Collections.<String, LocalResource> emptyMap(),
|
|
|
+ new HashMap<String, String>(), Arrays.asList("sleep", "100"),
|
|
|
+ new HashMap<String, ByteBuffer>(), null,
|
|
|
+ new HashMap<ApplicationAccessType, String>());
|
|
|
appContext.setAMContainerSpec(amContainer);
|
|
|
appContext.setResource(Resource.newInstance(1024, 1));
|
|
|
// Create the request to send to the applications manager
|
|
@@ -212,7 +212,7 @@ public class TestAMRMClient {
|
|
|
attemptId = appReport.getCurrentApplicationAttemptId();
|
|
|
appAttempt =
|
|
|
yarnCluster.getResourceManager().getRMContext().getRMApps()
|
|
|
- .get(attemptId.getApplicationId()).getCurrentAppAttempt();
|
|
|
+ .get(attemptId.getApplicationId()).getCurrentAppAttempt();
|
|
|
while (true) {
|
|
|
if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
|
|
|
break;
|
|
@@ -224,14 +224,14 @@ public class TestAMRMClient {
|
|
|
// Just dig into the ResourceManager and get the AMRMToken just for the sake
|
|
|
// of testing.
|
|
|
UserGroupInformation.setLoginUser(UserGroupInformation
|
|
|
- .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
|
|
|
+ .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
|
|
|
|
|
|
// emulate RM setup of AMRM token in credentials by adding the token
|
|
|
// *before* setting the token service
|
|
|
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
|
|
|
appAttempt.getAMRMToken().setService(ClientRMProxy.getAMRMTokenService(conf));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@After
|
|
|
public void teardown() throws YarnException, IOException {
|
|
|
yarnClient.killApplication(attemptId.getApplicationId());
|
|
@@ -258,7 +258,7 @@ public class TestAMRMClient {
|
|
|
amClient.getMatchingRequests(priority, node, testCapability1);
|
|
|
assertEquals("Expected no matching requests.", matches.size(), 0);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testAMRMClientMatchingFit() throws YarnException, IOException {
|
|
|
AMRMClient<ContainerRequest> amClient = null;
|
|
@@ -268,7 +268,7 @@ public class TestAMRMClient {
|
|
|
amClient.init(conf);
|
|
|
amClient.start();
|
|
|
amClient.registerApplicationMaster("Host", 10000, "");
|
|
|
-
|
|
|
+
|
|
|
Resource capability1 = Resource.newInstance(1024, 2);
|
|
|
Resource capability2 = Resource.newInstance(1024, 1);
|
|
|
Resource capability3 = Resource.newInstance(1000, 2);
|
|
@@ -277,19 +277,19 @@ public class TestAMRMClient {
|
|
|
Resource capability6 = Resource.newInstance(2000, 1);
|
|
|
Resource capability7 = Resource.newInstance(2000, 1);
|
|
|
|
|
|
- ContainerRequest storedContainer1 =
|
|
|
+ ContainerRequest storedContainer1 =
|
|
|
new ContainerRequest(capability1, nodes, racks, priority);
|
|
|
- ContainerRequest storedContainer2 =
|
|
|
+ ContainerRequest storedContainer2 =
|
|
|
new ContainerRequest(capability2, nodes, racks, priority);
|
|
|
- ContainerRequest storedContainer3 =
|
|
|
+ ContainerRequest storedContainer3 =
|
|
|
new ContainerRequest(capability3, nodes, racks, priority);
|
|
|
- ContainerRequest storedContainer4 =
|
|
|
+ ContainerRequest storedContainer4 =
|
|
|
new ContainerRequest(capability4, nodes, racks, priority);
|
|
|
- ContainerRequest storedContainer5 =
|
|
|
+ ContainerRequest storedContainer5 =
|
|
|
new ContainerRequest(capability5, nodes, racks, priority);
|
|
|
- ContainerRequest storedContainer6 =
|
|
|
+ ContainerRequest storedContainer6 =
|
|
|
new ContainerRequest(capability6, nodes, racks, priority);
|
|
|
- ContainerRequest storedContainer7 =
|
|
|
+ ContainerRequest storedContainer7 =
|
|
|
new ContainerRequest(capability7, nodes, racks, priority2, false);
|
|
|
amClient.addContainerRequest(storedContainer1);
|
|
|
amClient.addContainerRequest(storedContainer2);
|
|
@@ -310,7 +310,7 @@ public class TestAMRMClient {
|
|
|
amClient.addContainerRequest(storedContainer11);
|
|
|
amClient.addContainerRequest(storedContainer33);
|
|
|
amClient.addContainerRequest(storedContainer43);
|
|
|
-
|
|
|
+
|
|
|
// test matching of containers
|
|
|
List<? extends Collection<ContainerRequest>> matches;
|
|
|
ContainerRequest storedRequest;
|
|
@@ -340,7 +340,7 @@ public class TestAMRMClient {
|
|
|
storedRequest = iter.next();
|
|
|
assertEquals(storedContainer33, storedRequest);
|
|
|
amClient.removeContainerRequest(storedContainer33);
|
|
|
-
|
|
|
+
|
|
|
// exact matching with order maintained
|
|
|
Resource testCapability2 = Resource.newInstance(2000, 1);
|
|
|
matches = amClient.getMatchingRequests(priority, node, testCapability2);
|
|
@@ -355,12 +355,12 @@ public class TestAMRMClient {
|
|
|
}
|
|
|
}
|
|
|
amClient.removeContainerRequest(storedContainer6);
|
|
|
-
|
|
|
+
|
|
|
// matching with larger container. all requests returned
|
|
|
Resource testCapability3 = Resource.newInstance(4000, 4);
|
|
|
matches = amClient.getMatchingRequests(priority, node, testCapability3);
|
|
|
assert(matches.size() == 4);
|
|
|
-
|
|
|
+
|
|
|
Resource testCapability4 = Resource.newInstance(1024, 2);
|
|
|
matches = amClient.getMatchingRequests(priority, node, testCapability4);
|
|
|
assert(matches.size() == 2);
|
|
@@ -370,14 +370,14 @@ public class TestAMRMClient {
|
|
|
ContainerRequest testRequest = testSet.iterator().next();
|
|
|
assertTrue(testRequest != storedContainer4);
|
|
|
assertTrue(testRequest != storedContainer5);
|
|
|
- assert(testRequest == storedContainer2 ||
|
|
|
- testRequest == storedContainer3);
|
|
|
+ assert(testRequest == storedContainer2 ||
|
|
|
+ testRequest == storedContainer3);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
Resource testCapability5 = Resource.newInstance(512, 4);
|
|
|
matches = amClient.getMatchingRequests(priority, node, testCapability5);
|
|
|
assert(matches.size() == 0);
|
|
|
-
|
|
|
+
|
|
|
// verify requests without relaxed locality are only returned at specific
|
|
|
// locations
|
|
|
Resource testCapability7 = Resource.newInstance(2000, 1);
|
|
@@ -386,7 +386,7 @@ public class TestAMRMClient {
|
|
|
assert(matches.size() == 0);
|
|
|
matches = amClient.getMatchingRequests(priority2, node, testCapability7);
|
|
|
assert(matches.size() == 1);
|
|
|
-
|
|
|
+
|
|
|
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
|
|
|
null, null);
|
|
|
|
|
@@ -529,14 +529,14 @@ public class TestAMRMClient {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private void verifyMatches(
|
|
|
- List<? extends Collection<ContainerRequest>> matches,
|
|
|
- int matchSize) {
|
|
|
+ List<? extends Collection<ContainerRequest>> matches,
|
|
|
+ int matchSize) {
|
|
|
assertEquals(1, matches.size());
|
|
|
assertEquals(matchSize, matches.get(0).size());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testAMRMClientMatchingFitInferredRack()
|
|
|
throws YarnException, IOException {
|
|
@@ -591,19 +591,19 @@ public class TestAMRMClient {
|
|
|
// start am rm client
|
|
|
amClient =
|
|
|
(AMRMClientImpl<ContainerRequest>) AMRMClient
|
|
|
- .<ContainerRequest> createAMRMClient();
|
|
|
+ .<ContainerRequest> createAMRMClient();
|
|
|
amClient.init(conf);
|
|
|
amClient.start();
|
|
|
amClient.registerApplicationMaster("Host", 10000, "");
|
|
|
-
|
|
|
+
|
|
|
Priority priority1 = Records.newRecord(Priority.class);
|
|
|
priority1.setPriority(2);
|
|
|
-
|
|
|
- ContainerRequest storedContainer1 =
|
|
|
+
|
|
|
+ ContainerRequest storedContainer1 =
|
|
|
new ContainerRequest(capability, nodes, racks, priority);
|
|
|
- ContainerRequest storedContainer2 =
|
|
|
+ ContainerRequest storedContainer2 =
|
|
|
new ContainerRequest(capability, nodes, racks, priority);
|
|
|
- ContainerRequest storedContainer3 =
|
|
|
+ ContainerRequest storedContainer3 =
|
|
|
new ContainerRequest(capability, null, null, priority1);
|
|
|
amClient.addContainerRequest(storedContainer1);
|
|
|
amClient.addContainerRequest(storedContainer2);
|
|
@@ -611,7 +611,7 @@ public class TestAMRMClient {
|
|
|
|
|
|
ProfileCapability profileCapability =
|
|
|
ProfileCapability.newInstance(capability);
|
|
|
-
|
|
|
+
|
|
|
// test addition and storage
|
|
|
RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
|
|
|
amClient.getTable(0);
|
|
@@ -622,21 +622,21 @@ public class TestAMRMClient {
|
|
|
containersRequestedAny = remoteRequestsTable.get(priority1,
|
|
|
ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
|
|
|
.remoteRequest.getNumContainers();
|
|
|
- assertEquals(1, containersRequestedAny);
|
|
|
- List<? extends Collection<ContainerRequest>> matches =
|
|
|
+ assertEquals(1, containersRequestedAny);
|
|
|
+ List<? extends Collection<ContainerRequest>> matches =
|
|
|
amClient.getMatchingRequests(priority, node, capability);
|
|
|
verifyMatches(matches, 2);
|
|
|
matches = amClient.getMatchingRequests(priority, rack, capability);
|
|
|
verifyMatches(matches, 2);
|
|
|
- matches =
|
|
|
+ matches =
|
|
|
amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
|
|
|
verifyMatches(matches, 2);
|
|
|
matches = amClient.getMatchingRequests(priority1, rack, capability);
|
|
|
assertTrue(matches.isEmpty());
|
|
|
- matches =
|
|
|
+ matches =
|
|
|
amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
|
|
|
verifyMatches(matches, 1);
|
|
|
-
|
|
|
+
|
|
|
// test removal
|
|
|
amClient.removeContainerRequest(storedContainer3);
|
|
|
matches = amClient.getMatchingRequests(priority, node, capability);
|
|
@@ -646,20 +646,20 @@ public class TestAMRMClient {
|
|
|
verifyMatches(matches, 1);
|
|
|
matches = amClient.getMatchingRequests(priority, rack, capability);
|
|
|
verifyMatches(matches, 1);
|
|
|
-
|
|
|
+
|
|
|
// test matching of containers
|
|
|
ContainerRequest storedRequest = matches.get(0).iterator().next();
|
|
|
assertEquals(storedContainer1, storedRequest);
|
|
|
amClient.removeContainerRequest(storedContainer1);
|
|
|
- matches =
|
|
|
+ matches =
|
|
|
amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
|
|
|
assertTrue(matches.isEmpty());
|
|
|
- matches =
|
|
|
+ matches =
|
|
|
amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
|
|
|
assertTrue(matches.isEmpty());
|
|
|
// 0 requests left. everything got cleaned up
|
|
|
assertTrue(amClient.getTable(0).isEmpty());
|
|
|
-
|
|
|
+
|
|
|
// go through an exemplary allocation, matching and release cycle
|
|
|
amClient.addContainerRequest(storedContainer1);
|
|
|
amClient.addContainerRequest(storedContainer3);
|
|
@@ -673,16 +673,16 @@ public class TestAMRMClient {
|
|
|
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
|
|
assertEquals(0, amClient.ask.size());
|
|
|
assertEquals(0, amClient.release.size());
|
|
|
-
|
|
|
+
|
|
|
assertEquals(nodeCount, amClient.getClusterNodeCount());
|
|
|
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
|
|
|
for(Container container : allocResponse.getAllocatedContainers()) {
|
|
|
- ContainerRequest expectedRequest =
|
|
|
+ ContainerRequest expectedRequest =
|
|
|
container.getPriority().equals(storedContainer1.getPriority()) ?
|
|
|
storedContainer1 : storedContainer3;
|
|
|
- matches = amClient.getMatchingRequests(container.getPriority(),
|
|
|
- ResourceRequest.ANY,
|
|
|
- container.getResource());
|
|
|
+ matches = amClient.getMatchingRequests(container.getPriority(),
|
|
|
+ ResourceRequest.ANY,
|
|
|
+ container.getResource());
|
|
|
// test correct matched container is returned
|
|
|
verifyMatches(matches, 1);
|
|
|
ContainerRequest matchedRequest = matches.get(0).iterator().next();
|
|
@@ -696,7 +696,7 @@ public class TestAMRMClient {
|
|
|
triggerSchedulingWithNMHeartBeat();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
assertEquals(2, allocatedContainerCount);
|
|
|
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
|
|
assertEquals(0, amClient.release.size());
|
|
@@ -704,7 +704,7 @@ public class TestAMRMClient {
|
|
|
assertEquals(0, allocResponse.getAllocatedContainers().size());
|
|
|
// 0 requests left. everything got cleaned up
|
|
|
assertTrue(remoteRequestsTable.isEmpty());
|
|
|
-
|
|
|
+
|
|
|
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
|
|
|
null, null);
|
|
|
|
|
@@ -742,46 +742,46 @@ public class TestAMRMClient {
|
|
|
// start am rm client
|
|
|
amClient =
|
|
|
(AMRMClientImpl<ContainerRequest>) AMRMClient
|
|
|
- .<ContainerRequest> createAMRMClient();
|
|
|
+ .<ContainerRequest> createAMRMClient();
|
|
|
amClient.init(conf);
|
|
|
amClient.start();
|
|
|
amClient.registerApplicationMaster("Host", 10000, "");
|
|
|
-
|
|
|
+
|
|
|
assertEquals(0, amClient.ask.size());
|
|
|
assertEquals(0, amClient.release.size());
|
|
|
-
|
|
|
- ContainerRequest storedContainer1 =
|
|
|
+
|
|
|
+ ContainerRequest storedContainer1 =
|
|
|
new ContainerRequest(capability, nodes, racks, priority);
|
|
|
amClient.addContainerRequest(storedContainer1);
|
|
|
assertEquals(3, amClient.ask.size());
|
|
|
assertEquals(0, amClient.release.size());
|
|
|
-
|
|
|
+
|
|
|
List<String> localNodeBlacklist = new ArrayList<String>();
|
|
|
localNodeBlacklist.add(node);
|
|
|
-
|
|
|
+
|
|
|
// put node in black list, so no container assignment
|
|
|
amClient.updateBlacklist(localNodeBlacklist, null);
|
|
|
|
|
|
int allocatedContainerCount = getAllocatedContainersNumber(amClient,
|
|
|
- DEFAULT_ITERATION);
|
|
|
+ DEFAULT_ITERATION);
|
|
|
// the only node is in blacklist, so no allocation
|
|
|
assertEquals(0, allocatedContainerCount);
|
|
|
|
|
|
// Remove node from blacklist, so get assigned with 2
|
|
|
amClient.updateBlacklist(null, localNodeBlacklist);
|
|
|
- ContainerRequest storedContainer2 =
|
|
|
- new ContainerRequest(capability, nodes, racks, priority);
|
|
|
+ ContainerRequest storedContainer2 =
|
|
|
+ new ContainerRequest(capability, nodes, racks, priority);
|
|
|
amClient.addContainerRequest(storedContainer2);
|
|
|
allocatedContainerCount = getAllocatedContainersNumber(amClient,
|
|
|
DEFAULT_ITERATION);
|
|
|
assertEquals(2, allocatedContainerCount);
|
|
|
-
|
|
|
+
|
|
|
// Test in case exception in allocate(), blacklist is kept
|
|
|
assertTrue(amClient.blacklistAdditions.isEmpty());
|
|
|
assertTrue(amClient.blacklistRemovals.isEmpty());
|
|
|
-
|
|
|
+
|
|
|
// create a invalid ContainerRequest - memory value is minus
|
|
|
- ContainerRequest invalidContainerRequest =
|
|
|
+ ContainerRequest invalidContainerRequest =
|
|
|
new ContainerRequest(Resource.newInstance(-1024, 1),
|
|
|
nodes, racks, priority);
|
|
|
amClient.addContainerRequest(invalidContainerRequest);
|
|
@@ -799,7 +799,7 @@ public class TestAMRMClient {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test (timeout=60000)
|
|
|
public void testAMRMClientWithBlacklist() throws YarnException, IOException {
|
|
|
AMRMClientImpl<ContainerRequest> amClient = null;
|
|
@@ -807,12 +807,12 @@ public class TestAMRMClient {
|
|
|
// start am rm client
|
|
|
amClient =
|
|
|
(AMRMClientImpl<ContainerRequest>) AMRMClient
|
|
|
- .<ContainerRequest> createAMRMClient();
|
|
|
+ .<ContainerRequest> createAMRMClient();
|
|
|
amClient.init(conf);
|
|
|
amClient.start();
|
|
|
amClient.registerApplicationMaster("Host", 10000, "");
|
|
|
String[] nodes = {"node1", "node2", "node3"};
|
|
|
-
|
|
|
+
|
|
|
// Add nodes[0] and nodes[1]
|
|
|
List<String> nodeList01 = new ArrayList<String>();
|
|
|
nodeList01.add(nodes[0]);
|
|
@@ -820,7 +820,7 @@ public class TestAMRMClient {
|
|
|
amClient.updateBlacklist(nodeList01, null);
|
|
|
assertEquals(2, amClient.blacklistAdditions.size());
|
|
|
assertEquals(0, amClient.blacklistRemovals.size());
|
|
|
-
|
|
|
+
|
|
|
// Add nodes[0] again, verify it is not added duplicated.
|
|
|
List<String> nodeList02 = new ArrayList<String>();
|
|
|
nodeList02.add(nodes[0]);
|
|
@@ -828,8 +828,8 @@ public class TestAMRMClient {
|
|
|
amClient.updateBlacklist(nodeList02, null);
|
|
|
assertEquals(3, amClient.blacklistAdditions.size());
|
|
|
assertEquals(0, amClient.blacklistRemovals.size());
|
|
|
-
|
|
|
- // Add nodes[1] and nodes[2] to removal list,
|
|
|
+
|
|
|
+ // Add nodes[1] and nodes[2] to removal list,
|
|
|
// Verify addition list remove these two nodes.
|
|
|
List<String> nodeList12 = new ArrayList<String>();
|
|
|
nodeList12.add(nodes[1]);
|
|
@@ -837,8 +837,8 @@ public class TestAMRMClient {
|
|
|
amClient.updateBlacklist(null, nodeList12);
|
|
|
assertEquals(1, amClient.blacklistAdditions.size());
|
|
|
assertEquals(2, amClient.blacklistRemovals.size());
|
|
|
-
|
|
|
- // Add nodes[1] again to addition list,
|
|
|
+
|
|
|
+ // Add nodes[1] again to addition list,
|
|
|
// Verify removal list will remove this node.
|
|
|
List<String> nodeList1 = new ArrayList<String>();
|
|
|
nodeList1.add(nodes[1]);
|
|
@@ -862,10 +862,10 @@ public class TestAMRMClient {
|
|
|
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
|
|
assertEquals(0, amClient.ask.size());
|
|
|
assertEquals(0, amClient.release.size());
|
|
|
-
|
|
|
+
|
|
|
assertEquals(nodeCount, amClient.getClusterNodeCount());
|
|
|
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
|
|
|
-
|
|
|
+
|
|
|
if(allocatedContainerCount == 0) {
|
|
|
// let NM heartbeat to RM and trigger allocations
|
|
|
triggerSchedulingWithNMHeartBeat();
|
|
@@ -905,7 +905,7 @@ public class TestAMRMClient {
|
|
|
//setting an instance NMTokenCache
|
|
|
amClient.setNMTokenCache(new NMTokenCache());
|
|
|
//asserting we are not using the singleton instance cache
|
|
|
- Assert.assertNotSame(NMTokenCache.getSingleton(),
|
|
|
+ Assert.assertNotSame(NMTokenCache.getSingleton(),
|
|
|
amClient.getNMTokenCache());
|
|
|
|
|
|
amClient.init(conf);
|
|
@@ -928,7 +928,7 @@ public class TestAMRMClient {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test(timeout=30000)
|
|
|
public void testAskWithNodeLabels() {
|
|
|
AMRMClientImpl<ContainerRequest> client =
|
|
@@ -949,7 +949,7 @@ public class TestAMRMClient {
|
|
|
assertEquals(1, client.ask.size());
|
|
|
assertEquals("a", client.ask.iterator().next()
|
|
|
.getNodeLabelExpression());
|
|
|
-
|
|
|
+
|
|
|
// add exp=x to ANY, rack and node, only resource request has ANY resource
|
|
|
// name will be assigned the label expression
|
|
|
// add exp=x then add exp=a to ANY in same priority, only exp=a should kept
|
|
@@ -976,7 +976,7 @@ public class TestAMRMClient {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private void verifyAddRequestFailed(AMRMClient<ContainerRequest> client,
|
|
|
ContainerRequest request) {
|
|
|
try {
|
|
@@ -986,7 +986,7 @@ public class TestAMRMClient {
|
|
|
}
|
|
|
fail();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
@Test(timeout=30000)
|
|
|
public void testAskWithInvalidNodeLabels() {
|
|
|
AMRMClientImpl<ContainerRequest> client =
|
|
@@ -1191,7 +1191,7 @@ public class TestAMRMClient {
|
|
|
.newInstance(ExecutionType.OPPORTUNISTIC, true)));
|
|
|
|
|
|
ProfileCapability profileCapability =
|
|
|
- ProfileCapability.newInstance(capability);
|
|
|
+ ProfileCapability.newInstance(capability);
|
|
|
int oppContainersRequestedAny =
|
|
|
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
|
|
|
ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
|
|
@@ -1495,10 +1495,10 @@ public class TestAMRMClient {
|
|
|
private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
|
|
|
throws YarnException, IOException {
|
|
|
// setup container request
|
|
|
-
|
|
|
+
|
|
|
assertEquals(0, amClient.ask.size());
|
|
|
assertEquals(0, amClient.release.size());
|
|
|
-
|
|
|
+
|
|
|
amClient.addContainerRequest(
|
|
|
new ContainerRequest(capability, nodes, racks, priority));
|
|
|
amClient.addContainerRequest(
|
|
@@ -1519,17 +1519,17 @@ public class TestAMRMClient {
|
|
|
int allocatedContainerCount = 0;
|
|
|
int iterationsLeft = 3;
|
|
|
Set<ContainerId> releases = new TreeSet<ContainerId>();
|
|
|
-
|
|
|
+
|
|
|
amClient.getNMTokenCache().clearCache();
|
|
|
assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache());
|
|
|
HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
|
|
|
-
|
|
|
+
|
|
|
while (allocatedContainerCount < containersRequestedAny
|
|
|
&& iterationsLeft-- > 0) {
|
|
|
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
|
|
assertEquals(0, amClient.ask.size());
|
|
|
assertEquals(0, amClient.release.size());
|
|
|
-
|
|
|
+
|
|
|
assertEquals(nodeCount, amClient.getClusterNodeCount());
|
|
|
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
|
|
|
for(Container container : allocResponse.getAllocatedContainers()) {
|
|
@@ -1537,7 +1537,7 @@ public class TestAMRMClient {
|
|
|
releases.add(rejectContainerId);
|
|
|
amClient.releaseAssignedContainer(rejectContainerId);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
for (NMToken token : allocResponse.getNMTokens()) {
|
|
|
String nodeID = token.getNodeId().toString();
|
|
|
if (receivedNMTokens.containsKey(nodeID)) {
|
|
@@ -1545,21 +1545,21 @@ public class TestAMRMClient {
|
|
|
}
|
|
|
receivedNMTokens.put(nodeID, token.getToken());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
if(allocatedContainerCount < containersRequestedAny) {
|
|
|
// let NM heartbeat to RM and trigger allocations
|
|
|
triggerSchedulingWithNMHeartBeat();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// Should receive atleast 1 token
|
|
|
assertTrue(receivedNMTokens.size() > 0
|
|
|
&& receivedNMTokens.size() <= nodeCount);
|
|
|
-
|
|
|
+
|
|
|
assertEquals(allocatedContainerCount, containersRequestedAny);
|
|
|
assertEquals(2, releases.size());
|
|
|
assertEquals(0, amClient.ask.size());
|
|
|
-
|
|
|
+
|
|
|
// need to tell the AMRMClient that we dont need these resources anymore
|
|
|
amClient.removeContainerRequest(
|
|
|
new ContainerRequest(capability, nodes, racks, priority));
|
|
@@ -1569,7 +1569,7 @@ public class TestAMRMClient {
|
|
|
// send 0 container count request for resources that are no longer needed
|
|
|
ResourceRequest snoopRequest = amClient.ask.iterator().next();
|
|
|
assertEquals(0, snoopRequest.getNumContainers());
|
|
|
-
|
|
|
+
|
|
|
// test RPC exception handling
|
|
|
amClient.addContainerRequest(new ContainerRequest(capability, nodes,
|
|
|
racks, priority));
|
|
@@ -1577,7 +1577,7 @@ public class TestAMRMClient {
|
|
|
racks, priority));
|
|
|
snoopRequest = amClient.ask.iterator().next();
|
|
|
assertEquals(2, snoopRequest.getNumContainers());
|
|
|
-
|
|
|
+
|
|
|
ApplicationMasterProtocol realRM = amClient.rmClient;
|
|
|
try {
|
|
|
ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol.class);
|
|
@@ -1586,8 +1586,8 @@ public class TestAMRMClient {
|
|
|
public AllocateResponse answer(InvocationOnMock invocation)
|
|
|
throws Exception {
|
|
|
amClient.removeContainerRequest(
|
|
|
- new ContainerRequest(capability, nodes,
|
|
|
- racks, priority));
|
|
|
+ new ContainerRequest(capability, nodes,
|
|
|
+ racks, priority));
|
|
|
amClient.removeContainerRequest(
|
|
|
new ContainerRequest(capability, nodes, racks, priority));
|
|
|
throw new Exception();
|
|
@@ -1603,7 +1603,7 @@ public class TestAMRMClient {
|
|
|
assertEquals(2, amClient.release.size());
|
|
|
assertEquals(3, amClient.ask.size());
|
|
|
snoopRequest = amClient.ask.iterator().next();
|
|
|
- // verify that the remove request made in between makeRequest and allocate
|
|
|
+ // verify that the remove request made in between makeRequest and allocate
|
|
|
// has not been lost
|
|
|
assertEquals(0, snoopRequest.getNumContainers());
|
|
|
|
|
@@ -1765,7 +1765,7 @@ public class TestAMRMClient {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private void sleep(int sleepTime) {
|
|
|
try {
|
|
|
Thread.sleep(sleepTime);
|
|
@@ -1781,7 +1781,7 @@ public class TestAMRMClient {
|
|
|
try {
|
|
|
AMRMTokenSecretManager amrmTokenSecretManager =
|
|
|
yarnCluster.getResourceManager().getRMContext()
|
|
|
- .getAMRMTokenSecretManager();
|
|
|
+ .getAMRMTokenSecretManager();
|
|
|
|
|
|
// start am rm client
|
|
|
amClient = AMRMClient.<ContainerRequest> createAMRMClient();
|
|
@@ -1796,7 +1796,7 @@ public class TestAMRMClient {
|
|
|
getAMRMToken();
|
|
|
Assert.assertNotNull(amrmToken_1);
|
|
|
assertEquals(amrmToken_1.decodeIdentifier().getKeyId(),
|
|
|
- amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
|
|
|
+ amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
|
|
|
|
|
|
// Wait for enough time and make sure the roll_over happens
|
|
|
// At mean time, the old AMRMToken should continue to work
|
|
@@ -1811,41 +1811,41 @@ public class TestAMRMClient {
|
|
|
getAMRMToken();
|
|
|
Assert.assertNotNull(amrmToken_2);
|
|
|
assertEquals(amrmToken_2.decodeIdentifier().getKeyId(),
|
|
|
- amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
|
|
|
+ amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
|
|
|
|
|
|
Assert.assertNotEquals(amrmToken_1, amrmToken_2);
|
|
|
|
|
|
// can do the allocate call with latest AMRMToken
|
|
|
AllocateResponse response = amClient.allocate(0.1f);
|
|
|
-
|
|
|
+
|
|
|
// Verify latest AMRMToken can be used to send allocation request.
|
|
|
UserGroupInformation testUser1 =
|
|
|
UserGroupInformation.createRemoteUser("testUser1");
|
|
|
-
|
|
|
- AMRMTokenIdentifierForTest newVersionTokenIdentifier =
|
|
|
+
|
|
|
+ AMRMTokenIdentifierForTest newVersionTokenIdentifier =
|
|
|
new AMRMTokenIdentifierForTest(amrmToken_2.decodeIdentifier(), "message");
|
|
|
-
|
|
|
+
|
|
|
assertEquals("Message is changed after set to newVersionTokenIdentifier",
|
|
|
"message", newVersionTokenIdentifier.getMessage());
|
|
|
- org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newVersionToken =
|
|
|
+ org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newVersionToken =
|
|
|
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> (
|
|
|
- newVersionTokenIdentifier.getBytes(),
|
|
|
+ newVersionTokenIdentifier.getBytes(),
|
|
|
amrmTokenSecretManager.retrievePassword(newVersionTokenIdentifier),
|
|
|
newVersionTokenIdentifier.getKind(), new Text());
|
|
|
-
|
|
|
+
|
|
|
SecurityUtil.setTokenService(newVersionToken, yarnCluster
|
|
|
- .getResourceManager().getApplicationMasterService().getBindAddress());
|
|
|
+ .getResourceManager().getApplicationMasterService().getBindAddress());
|
|
|
testUser1.addToken(newVersionToken);
|
|
|
-
|
|
|
+
|
|
|
AllocateRequest request = Records.newRecord(AllocateRequest.class);
|
|
|
request.setResponseId(response.getResponseId());
|
|
|
testUser1.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
|
|
|
@Override
|
|
|
public ApplicationMasterProtocol run() {
|
|
|
return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy(
|
|
|
- ApplicationMasterProtocol.class,
|
|
|
- yarnCluster.getResourceManager().getApplicationMasterService()
|
|
|
- .getBindAddress(), conf);
|
|
|
+ ApplicationMasterProtocol.class,
|
|
|
+ yarnCluster.getResourceManager().getApplicationMasterService()
|
|
|
+ .getBindAddress(), conf);
|
|
|
}
|
|
|
}).allocate(request);
|
|
|
|
|
@@ -1853,12 +1853,12 @@ public class TestAMRMClient {
|
|
|
// and can not use this rolled-over token to make a allocate all.
|
|
|
while (true) {
|
|
|
if (amrmToken_2.decodeIdentifier().getKeyId() != amrmTokenSecretManager
|
|
|
- .getCurrnetMasterKeyData().getMasterKey().getKeyId()) {
|
|
|
+ .getCurrnetMasterKeyData().getMasterKey().getKeyId()) {
|
|
|
if (amrmTokenSecretManager.getNextMasterKeyData() == null) {
|
|
|
break;
|
|
|
} else if (amrmToken_2.decodeIdentifier().getKeyId() !=
|
|
|
amrmTokenSecretManager.getNextMasterKeyData().getMasterKey()
|
|
|
- .getKeyId()) {
|
|
|
+ .getKeyId()) {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -1870,27 +1870,27 @@ public class TestAMRMClient {
|
|
|
UserGroupInformation testUser2 =
|
|
|
UserGroupInformation.createRemoteUser("testUser2");
|
|
|
SecurityUtil.setTokenService(amrmToken_2, yarnCluster
|
|
|
- .getResourceManager().getApplicationMasterService().getBindAddress());
|
|
|
+ .getResourceManager().getApplicationMasterService().getBindAddress());
|
|
|
testUser2.addToken(amrmToken_2);
|
|
|
testUser2.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
|
|
|
@Override
|
|
|
public ApplicationMasterProtocol run() {
|
|
|
return (ApplicationMasterProtocol) YarnRPC.create(conf).getProxy(
|
|
|
- ApplicationMasterProtocol.class,
|
|
|
- yarnCluster.getResourceManager().getApplicationMasterService()
|
|
|
- .getBindAddress(), conf);
|
|
|
+ ApplicationMasterProtocol.class,
|
|
|
+ yarnCluster.getResourceManager().getApplicationMasterService()
|
|
|
+ .getBindAddress(), conf);
|
|
|
}
|
|
|
}).allocate(Records.newRecord(AllocateRequest.class));
|
|
|
fail("The old Token should not work");
|
|
|
} catch (Exception ex) {
|
|
|
assertTrue(ex instanceof InvalidToken);
|
|
|
assertTrue(ex.getMessage().contains(
|
|
|
- "Invalid AMRMToken from "
|
|
|
- + amrmToken_2.decodeIdentifier().getApplicationAttemptId()));
|
|
|
+ "Invalid AMRMToken from "
|
|
|
+ + amrmToken_2.decodeIdentifier().getApplicationAttemptId()));
|
|
|
}
|
|
|
|
|
|
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
|
|
|
- null, null);
|
|
|
+ null, null);
|
|
|
|
|
|
} finally {
|
|
|
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
|
|
@@ -1901,7 +1901,7 @@ public class TestAMRMClient {
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
private org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>
|
|
|
- getAMRMToken() throws IOException {
|
|
|
+ getAMRMToken() throws IOException {
|
|
|
Credentials credentials =
|
|
|
UserGroupInformation.getCurrentUser().getCredentials();
|
|
|
Iterator<org.apache.hadoop.security.token.Token<?>> iter =
|