|
@@ -19,6 +19,7 @@
|
|
package org.apache.hadoop.yarn.client.api.impl;
|
|
package org.apache.hadoop.yarn.client.api.impl;
|
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
import static org.junit.Assert.assertEquals;
|
|
|
|
+import static org.junit.Assert.assertNotNull;
|
|
import static org.junit.Assert.assertTrue;
|
|
import static org.junit.Assert.assertTrue;
|
|
import static org.junit.Assert.fail;
|
|
import static org.junit.Assert.fail;
|
|
import static org.mockito.Matchers.any;
|
|
import static org.mockito.Matchers.any;
|
|
@@ -36,6 +37,7 @@ import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.TreeSet;
|
|
import java.util.TreeSet;
|
|
|
|
|
|
@@ -142,6 +144,10 @@ public class TestAMRMClient {
|
|
// set the minimum allocation so that resource decrease can go under 1024
|
|
// set the minimum allocation so that resource decrease can go under 1024
|
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
|
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
|
|
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
|
|
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
|
|
|
|
+ conf.setBoolean(
|
|
|
|
+ YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
|
|
|
|
+ conf.setInt(
|
|
|
|
+ YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
|
|
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
|
|
yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
|
|
yarnCluster.init(conf);
|
|
yarnCluster.init(conf);
|
|
yarnCluster.start();
|
|
yarnCluster.start();
|
|
@@ -924,8 +930,8 @@ public class TestAMRMClient {
|
|
// add exp=x to ANY
|
|
// add exp=x to ANY
|
|
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
|
|
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
|
|
1), null, null, Priority.UNDEFINED, true, "x"));
|
|
1), null, null, Priority.UNDEFINED, true, "x"));
|
|
- Assert.assertEquals(1, client.ask.size());
|
|
|
|
- Assert.assertEquals("x", client.ask.iterator().next()
|
|
|
|
|
|
+ assertEquals(1, client.ask.size());
|
|
|
|
+ assertEquals("x", client.ask.iterator().next()
|
|
.getNodeLabelExpression());
|
|
.getNodeLabelExpression());
|
|
|
|
|
|
// add exp=x then add exp=a to ANY in same priority, only exp=a should kept
|
|
// add exp=x then add exp=a to ANY in same priority, only exp=a should kept
|
|
@@ -933,8 +939,8 @@ public class TestAMRMClient {
|
|
1), null, null, Priority.UNDEFINED, true, "x"));
|
|
1), null, null, Priority.UNDEFINED, true, "x"));
|
|
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
|
|
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
|
|
1), null, null, Priority.UNDEFINED, true, "a"));
|
|
1), null, null, Priority.UNDEFINED, true, "a"));
|
|
- Assert.assertEquals(1, client.ask.size());
|
|
|
|
- Assert.assertEquals("a", client.ask.iterator().next()
|
|
|
|
|
|
+ assertEquals(1, client.ask.size());
|
|
|
|
+ assertEquals("a", client.ask.iterator().next()
|
|
.getNodeLabelExpression());
|
|
.getNodeLabelExpression());
|
|
|
|
|
|
// add exp=x to ANY, rack and node, only resource request has ANY resource
|
|
// add exp=x to ANY, rack and node, only resource request has ANY resource
|
|
@@ -943,10 +949,10 @@ public class TestAMRMClient {
|
|
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
|
|
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
|
|
1), null, null, Priority.UNDEFINED, true,
|
|
1), null, null, Priority.UNDEFINED, true,
|
|
"y"));
|
|
"y"));
|
|
- Assert.assertEquals(1, client.ask.size());
|
|
|
|
|
|
+ assertEquals(1, client.ask.size());
|
|
for (ResourceRequest req : client.ask) {
|
|
for (ResourceRequest req : client.ask) {
|
|
if (ResourceRequest.ANY.equals(req.getResourceName())) {
|
|
if (ResourceRequest.ANY.equals(req.getResourceName())) {
|
|
- Assert.assertEquals("y", req.getNodeLabelExpression());
|
|
|
|
|
|
+ assertEquals("y", req.getNodeLabelExpression());
|
|
} else {
|
|
} else {
|
|
Assert.assertNull(req.getNodeLabelExpression());
|
|
Assert.assertNull(req.getNodeLabelExpression());
|
|
}
|
|
}
|
|
@@ -957,7 +963,7 @@ public class TestAMRMClient {
|
|
new String[] { "node1", "node2" }, Priority.UNDEFINED, true, "y"));
|
|
new String[] { "node1", "node2" }, Priority.UNDEFINED, true, "y"));
|
|
for (ResourceRequest req : client.ask) {
|
|
for (ResourceRequest req : client.ask) {
|
|
if (ResourceRequest.ANY.equals(req.getResourceName())) {
|
|
if (ResourceRequest.ANY.equals(req.getResourceName())) {
|
|
- Assert.assertEquals("y", req.getNodeLabelExpression());
|
|
|
|
|
|
+ assertEquals("y", req.getNodeLabelExpression());
|
|
} else {
|
|
} else {
|
|
Assert.assertNull(req.getNodeLabelExpression());
|
|
Assert.assertNull(req.getNodeLabelExpression());
|
|
}
|
|
}
|
|
@@ -971,7 +977,7 @@ public class TestAMRMClient {
|
|
} catch (InvalidContainerRequestException e) {
|
|
} catch (InvalidContainerRequestException e) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- Assert.fail();
|
|
|
|
|
|
+ fail();
|
|
}
|
|
}
|
|
|
|
|
|
@Test(timeout=30000)
|
|
@Test(timeout=30000)
|
|
@@ -1042,7 +1048,8 @@ public class TestAMRMClient {
|
|
// get allocations
|
|
// get allocations
|
|
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
|
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
|
List<Container> containers = allocResponse.getAllocatedContainers();
|
|
List<Container> containers = allocResponse.getAllocatedContainers();
|
|
- Assert.assertEquals(num, containers.size());
|
|
|
|
|
|
+ assertEquals(num, containers.size());
|
|
|
|
+
|
|
// build container launch context
|
|
// build container launch context
|
|
Credentials ts = new Credentials();
|
|
Credentials ts = new Credentials();
|
|
DataOutputBuffer dob = new DataOutputBuffer();
|
|
DataOutputBuffer dob = new DataOutputBuffer();
|
|
@@ -1083,14 +1090,14 @@ public class TestAMRMClient {
|
|
private void doContainerResourceChange(
|
|
private void doContainerResourceChange(
|
|
final AMRMClient<ContainerRequest> amClient, List<Container> containers)
|
|
final AMRMClient<ContainerRequest> amClient, List<Container> containers)
|
|
throws YarnException, IOException {
|
|
throws YarnException, IOException {
|
|
- Assert.assertEquals(3, containers.size());
|
|
|
|
|
|
+ assertEquals(3, containers.size());
|
|
// remember the container IDs
|
|
// remember the container IDs
|
|
Container container1 = containers.get(0);
|
|
Container container1 = containers.get(0);
|
|
Container container2 = containers.get(1);
|
|
Container container2 = containers.get(1);
|
|
Container container3 = containers.get(2);
|
|
Container container3 = containers.get(2);
|
|
AMRMClientImpl<ContainerRequest> amClientImpl =
|
|
AMRMClientImpl<ContainerRequest> amClientImpl =
|
|
(AMRMClientImpl<ContainerRequest>) amClient;
|
|
(AMRMClientImpl<ContainerRequest>) amClient;
|
|
- Assert.assertEquals(0, amClientImpl.change.size());
|
|
|
|
|
|
+ assertEquals(0, amClientImpl.change.size());
|
|
// verify newer request overwrites older request for the container1
|
|
// verify newer request overwrites older request for the container1
|
|
amClientImpl.requestContainerUpdate(container1,
|
|
amClientImpl.requestContainerUpdate(container1,
|
|
UpdateContainerRequest.newInstance(container1.getVersion(),
|
|
UpdateContainerRequest.newInstance(container1.getVersion(),
|
|
@@ -1100,21 +1107,21 @@ public class TestAMRMClient {
|
|
UpdateContainerRequest.newInstance(container1.getVersion(),
|
|
UpdateContainerRequest.newInstance(container1.getVersion(),
|
|
container1.getId(), ContainerUpdateType.INCREASE_RESOURCE,
|
|
container1.getId(), ContainerUpdateType.INCREASE_RESOURCE,
|
|
Resource.newInstance(4096, 1), null));
|
|
Resource.newInstance(4096, 1), null));
|
|
- Assert.assertEquals(Resource.newInstance(4096, 1),
|
|
|
|
|
|
+ assertEquals(Resource.newInstance(4096, 1),
|
|
amClientImpl.change.get(container1.getId()).getValue().getCapability());
|
|
amClientImpl.change.get(container1.getId()).getValue().getCapability());
|
|
// verify new decrease request cancels old increase request for container1
|
|
// verify new decrease request cancels old increase request for container1
|
|
amClientImpl.requestContainerUpdate(container1,
|
|
amClientImpl.requestContainerUpdate(container1,
|
|
UpdateContainerRequest.newInstance(container1.getVersion(),
|
|
UpdateContainerRequest.newInstance(container1.getVersion(),
|
|
container1.getId(), ContainerUpdateType.DECREASE_RESOURCE,
|
|
container1.getId(), ContainerUpdateType.DECREASE_RESOURCE,
|
|
Resource.newInstance(512, 1), null));
|
|
Resource.newInstance(512, 1), null));
|
|
- Assert.assertEquals(Resource.newInstance(512, 1),
|
|
|
|
|
|
+ assertEquals(Resource.newInstance(512, 1),
|
|
amClientImpl.change.get(container1.getId()).getValue().getCapability());
|
|
amClientImpl.change.get(container1.getId()).getValue().getCapability());
|
|
// request resource increase for container2
|
|
// request resource increase for container2
|
|
amClientImpl.requestContainerUpdate(container2,
|
|
amClientImpl.requestContainerUpdate(container2,
|
|
UpdateContainerRequest.newInstance(container2.getVersion(),
|
|
UpdateContainerRequest.newInstance(container2.getVersion(),
|
|
container2.getId(), ContainerUpdateType.INCREASE_RESOURCE,
|
|
container2.getId(), ContainerUpdateType.INCREASE_RESOURCE,
|
|
Resource.newInstance(2048, 1), null));
|
|
Resource.newInstance(2048, 1), null));
|
|
- Assert.assertEquals(Resource.newInstance(2048, 1),
|
|
|
|
|
|
+ assertEquals(Resource.newInstance(2048, 1),
|
|
amClientImpl.change.get(container2.getId()).getValue().getCapability());
|
|
amClientImpl.change.get(container2.getId()).getValue().getCapability());
|
|
// verify release request will cancel pending change requests for the same
|
|
// verify release request will cancel pending change requests for the same
|
|
// container
|
|
// container
|
|
@@ -1122,27 +1129,357 @@ public class TestAMRMClient {
|
|
UpdateContainerRequest.newInstance(container3.getVersion(),
|
|
UpdateContainerRequest.newInstance(container3.getVersion(),
|
|
container3.getId(), ContainerUpdateType.INCREASE_RESOURCE,
|
|
container3.getId(), ContainerUpdateType.INCREASE_RESOURCE,
|
|
Resource.newInstance(2048, 1), null));
|
|
Resource.newInstance(2048, 1), null));
|
|
- Assert.assertEquals(3, amClientImpl.pendingChange.size());
|
|
|
|
|
|
+ assertEquals(3, amClientImpl.pendingChange.size());
|
|
amClientImpl.releaseAssignedContainer(container3.getId());
|
|
amClientImpl.releaseAssignedContainer(container3.getId());
|
|
- Assert.assertEquals(2, amClientImpl.pendingChange.size());
|
|
|
|
|
|
+ assertEquals(2, amClientImpl.pendingChange.size());
|
|
// as of now: container1 asks to decrease to (512, 1)
|
|
// as of now: container1 asks to decrease to (512, 1)
|
|
// container2 asks to increase to (2048, 1)
|
|
// container2 asks to increase to (2048, 1)
|
|
// send allocation requests
|
|
// send allocation requests
|
|
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
|
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
|
- Assert.assertEquals(0, amClientImpl.change.size());
|
|
|
|
|
|
+ assertEquals(0, amClientImpl.change.size());
|
|
// we should get decrease confirmation right away
|
|
// we should get decrease confirmation right away
|
|
List<UpdatedContainer> updatedContainers =
|
|
List<UpdatedContainer> updatedContainers =
|
|
allocResponse.getUpdatedContainers();
|
|
allocResponse.getUpdatedContainers();
|
|
- Assert.assertEquals(1, updatedContainers.size());
|
|
|
|
|
|
+ assertEquals(1, updatedContainers.size());
|
|
// we should get increase allocation after the next NM's heartbeat to RM
|
|
// we should get increase allocation after the next NM's heartbeat to RM
|
|
triggerSchedulingWithNMHeartBeat();
|
|
triggerSchedulingWithNMHeartBeat();
|
|
// get allocations
|
|
// get allocations
|
|
allocResponse = amClient.allocate(0.1f);
|
|
allocResponse = amClient.allocate(0.1f);
|
|
updatedContainers =
|
|
updatedContainers =
|
|
allocResponse.getUpdatedContainers();
|
|
allocResponse.getUpdatedContainers();
|
|
- Assert.assertEquals(1, updatedContainers.size());
|
|
|
|
|
|
+ assertEquals(1, updatedContainers.size());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test(timeout=60000)
|
|
|
|
+ public void testAMRMClientWithContainerPromotion()
|
|
|
|
+ throws YarnException, IOException {
|
|
|
|
+ AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
|
|
|
|
+ (AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient
|
|
|
|
+ .createAMRMClient();
|
|
|
|
+ //asserting we are not using the singleton instance cache
|
|
|
|
+ Assert.assertSame(NMTokenCache.getSingleton(),
|
|
|
|
+ amClient.getNMTokenCache());
|
|
|
|
+ amClient.init(conf);
|
|
|
|
+ amClient.start();
|
|
|
|
+
|
|
|
|
+ // start am nm client
|
|
|
|
+ NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
|
|
|
|
+ Assert.assertNotNull(nmClient);
|
|
|
|
+ // asserting we are using the singleton instance cache
|
|
|
|
+ Assert.assertSame(
|
|
|
|
+ NMTokenCache.getSingleton(), nmClient.getNMTokenCache());
|
|
|
|
+ nmClient.init(conf);
|
|
|
|
+ nmClient.start();
|
|
|
|
+ assertEquals(STATE.STARTED, nmClient.getServiceState());
|
|
|
|
+
|
|
|
|
+ amClient.registerApplicationMaster("Host", 10000, "");
|
|
|
|
+ // setup container request
|
|
|
|
+ assertEquals(0, amClient.ask.size());
|
|
|
|
+ assertEquals(0, amClient.release.size());
|
|
|
|
+
|
|
|
|
+ // START OPPORTUNISTIC Container, Send allocation request to RM
|
|
|
|
+ amClient.addContainerRequest(
|
|
|
|
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
|
|
|
|
+ true, null, ExecutionTypeRequest
|
|
|
|
+ .newInstance(ExecutionType.OPPORTUNISTIC, true)));
|
|
|
|
+
|
|
|
|
+ int oppContainersRequestedAny =
|
|
|
|
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
|
|
|
|
+ ExecutionType.OPPORTUNISTIC, capability).remoteRequest
|
|
|
|
+ .getNumContainers();
|
|
|
|
+
|
|
|
|
+ assertEquals(1, oppContainersRequestedAny);
|
|
|
|
+ assertEquals(1, amClient.ask.size());
|
|
|
|
+ assertEquals(0, amClient.release.size());
|
|
|
|
+
|
|
|
|
+ // RM should allocate container within 2 calls to allocate()
|
|
|
|
+ int allocatedContainerCount = 0;
|
|
|
|
+ Map<ContainerId, Container> allocatedOpportContainers = new HashMap<>();
|
|
|
|
+ int iterationsLeft = 50;
|
|
|
|
+
|
|
|
|
+ amClient.getNMTokenCache().clearCache();
|
|
|
|
+ assertEquals(0,
|
|
|
|
+ amClient.getNMTokenCache().numberOfTokensInCache());
|
|
|
|
+
|
|
|
|
+ AllocateResponse allocResponse = null;
|
|
|
|
+ while (allocatedContainerCount < oppContainersRequestedAny
|
|
|
|
+ && iterationsLeft-- > 0) {
|
|
|
|
+ allocResponse = amClient.allocate(0.1f);
|
|
|
|
+ // let NM heartbeat to RM and trigger allocations
|
|
|
|
+ //triggerSchedulingWithNMHeartBeat();
|
|
|
|
+ assertEquals(0, amClient.ask.size());
|
|
|
|
+ assertEquals(0, amClient.release.size());
|
|
|
|
+
|
|
|
|
+ allocatedContainerCount +=
|
|
|
|
+ allocResponse.getAllocatedContainers().size();
|
|
|
|
+ for (Container container : allocResponse.getAllocatedContainers()) {
|
|
|
|
+ if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
|
|
|
|
+ allocatedOpportContainers.put(container.getId(), container);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (allocatedContainerCount < oppContainersRequestedAny) {
|
|
|
|
+ // sleep to let NM's heartbeat to RM and trigger allocations
|
|
|
|
+ sleep(100);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ assertEquals(oppContainersRequestedAny, allocatedContainerCount);
|
|
|
|
+ assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
|
|
|
|
+
|
|
|
|
+ startContainer(allocResponse, nmClient);
|
|
|
|
+
|
|
|
|
+ // SEND PROMOTION REQUEST TO RM
|
|
|
|
+ try {
|
|
|
|
+ Container c = allocatedOpportContainers.values().iterator().next();
|
|
|
|
+ amClient.requestContainerUpdate(
|
|
|
|
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
|
|
|
|
+ c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
|
|
|
|
+ null, ExecutionType.OPPORTUNISTIC));
|
|
|
|
+ fail("Should throw Exception..");
|
|
|
|
+ } catch (IllegalArgumentException e) {
|
|
|
|
+ System.out.println("## " + e.getMessage());
|
|
|
|
+ assertTrue(e.getMessage().contains(
|
|
|
|
+ "target should be GUARANTEED and original should be OPPORTUNISTIC"));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ Container c = allocatedOpportContainers.values().iterator().next();
|
|
|
|
+ amClient.requestContainerUpdate(
|
|
|
|
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
|
|
|
|
+ c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
|
|
|
|
+ null, ExecutionType.GUARANTEED));
|
|
|
|
+ iterationsLeft = 120;
|
|
|
|
+ Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
|
|
|
|
+ // do a few iterations to ensure RM is not going to send new containers
|
|
|
|
+ while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
|
|
|
|
+ // inform RM of rejection
|
|
|
|
+ allocResponse = amClient.allocate(0.1f);
|
|
|
|
+ // RM did not send new containers because AM does not need any
|
|
|
|
+ if (allocResponse.getUpdatedContainers() != null) {
|
|
|
|
+ for (UpdatedContainer updatedContainer : allocResponse
|
|
|
|
+ .getUpdatedContainers()) {
|
|
|
|
+ System.out.println("Got update..");
|
|
|
|
+ updatedContainers.put(updatedContainer.getContainer().getId(),
|
|
|
|
+ updatedContainer);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (iterationsLeft > 0) {
|
|
|
|
+ // sleep to make sure NM's heartbeat
|
|
|
|
+ sleep(100);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ assertEquals(1, updatedContainers.size());
|
|
|
|
+
|
|
|
|
+ for (ContainerId cId : allocatedOpportContainers.keySet()) {
|
|
|
|
+ Container orig = allocatedOpportContainers.get(cId);
|
|
|
|
+ UpdatedContainer updatedContainer = updatedContainers.get(cId);
|
|
|
|
+ assertNotNull(updatedContainer);
|
|
|
|
+ assertEquals(ExecutionType.GUARANTEED,
|
|
|
|
+ updatedContainer.getContainer().getExecutionType());
|
|
|
|
+ assertEquals(orig.getResource(),
|
|
|
|
+ updatedContainer.getContainer().getResource());
|
|
|
|
+ assertEquals(orig.getNodeId(),
|
|
|
|
+ updatedContainer.getContainer().getNodeId());
|
|
|
|
+ assertEquals(orig.getVersion() + 1,
|
|
|
|
+ updatedContainer.getContainer().getVersion());
|
|
|
|
+ }
|
|
|
|
+ assertEquals(0, amClient.ask.size());
|
|
|
|
+ assertEquals(0, amClient.release.size());
|
|
|
|
+
|
|
|
|
+ // SEND UPDATE EXECTYPE UPDATE TO NM
|
|
|
|
+ updateContainerExecType(allocResponse, ExecutionType.GUARANTEED, nmClient);
|
|
|
|
+
|
|
|
|
+ amClient.ask.clear();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test(timeout=60000)
|
|
|
|
+ public void testAMRMClientWithContainerDemotion()
|
|
|
|
+ throws YarnException, IOException {
|
|
|
|
+ AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
|
|
|
|
+ (AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient
|
|
|
|
+ .createAMRMClient();
|
|
|
|
+ //asserting we are not using the singleton instance cache
|
|
|
|
+ Assert.assertSame(NMTokenCache.getSingleton(),
|
|
|
|
+ amClient.getNMTokenCache());
|
|
|
|
+ amClient.init(conf);
|
|
|
|
+ amClient.start();
|
|
|
|
+
|
|
|
|
+ NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
|
|
|
|
+ Assert.assertNotNull(nmClient);
|
|
|
|
+ // asserting we are using the singleton instance cache
|
|
|
|
+ Assert.assertSame(
|
|
|
|
+ NMTokenCache.getSingleton(), nmClient.getNMTokenCache());
|
|
|
|
+ nmClient.init(conf);
|
|
|
|
+ nmClient.start();
|
|
|
|
+ assertEquals(STATE.STARTED, nmClient.getServiceState());
|
|
|
|
+
|
|
|
|
+ amClient.registerApplicationMaster("Host", 10000, "");
|
|
|
|
+ assertEquals(0, amClient.ask.size());
|
|
|
|
+ assertEquals(0, amClient.release.size());
|
|
|
|
+
|
|
|
|
+ // START OPPORTUNISTIC Container, Send allocation request to RM
|
|
|
|
+ amClient.addContainerRequest(
|
|
|
|
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
|
|
|
|
+ true, null, ExecutionTypeRequest
|
|
|
|
+ .newInstance(ExecutionType.GUARANTEED, true)));
|
|
|
|
+
|
|
|
|
+ int oppContainersRequestedAny =
|
|
|
|
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
|
|
|
|
+ ExecutionType.GUARANTEED, capability).remoteRequest
|
|
|
|
+ .getNumContainers();
|
|
|
|
+
|
|
|
|
+ assertEquals(1, oppContainersRequestedAny);
|
|
|
|
+ assertEquals(1, amClient.ask.size());
|
|
|
|
+ assertEquals(0, amClient.release.size());
|
|
|
|
+
|
|
|
|
+ // RM should allocate container within 2 calls to allocate()
|
|
|
|
+ int allocatedContainerCount = 0;
|
|
|
|
+ Map<ContainerId, Container> allocatedGuaranteedContainers = new HashMap<>();
|
|
|
|
+ int iterationsLeft = 50;
|
|
|
|
+
|
|
|
|
+ amClient.getNMTokenCache().clearCache();
|
|
|
|
+ assertEquals(0,
|
|
|
|
+ amClient.getNMTokenCache().numberOfTokensInCache());
|
|
|
|
+
|
|
|
|
+ AllocateResponse allocResponse = null;
|
|
|
|
+ while (allocatedContainerCount < oppContainersRequestedAny
|
|
|
|
+ && iterationsLeft-- > 0) {
|
|
|
|
+ allocResponse = amClient.allocate(0.1f);
|
|
|
|
+ // let NM heartbeat to RM and trigger allocations
|
|
|
|
+ //triggerSchedulingWithNMHeartBeat();
|
|
|
|
+ assertEquals(0, amClient.ask.size());
|
|
|
|
+ assertEquals(0, amClient.release.size());
|
|
|
|
+
|
|
|
|
+ allocatedContainerCount +=
|
|
|
|
+ allocResponse.getAllocatedContainers().size();
|
|
|
|
+ for (Container container : allocResponse.getAllocatedContainers()) {
|
|
|
|
+ if (container.getExecutionType() == ExecutionType.GUARANTEED) {
|
|
|
|
+ allocatedGuaranteedContainers.put(container.getId(), container);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (allocatedContainerCount < oppContainersRequestedAny) {
|
|
|
|
+ // sleep to let NM's heartbeat to RM and trigger allocations
|
|
|
|
+ sleep(100);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ assertEquals(oppContainersRequestedAny, allocatedContainerCount);
|
|
|
|
+ assertEquals(oppContainersRequestedAny,
|
|
|
|
+ allocatedGuaranteedContainers.size());
|
|
|
|
+ startContainer(allocResponse, nmClient);
|
|
|
|
+
|
|
|
|
+ // SEND DEMOTION REQUEST TO RM
|
|
|
|
+ try {
|
|
|
|
+ Container c = allocatedGuaranteedContainers.values().iterator().next();
|
|
|
|
+ amClient.requestContainerUpdate(
|
|
|
|
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
|
|
|
|
+ c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
|
|
|
|
+ null, ExecutionType.GUARANTEED));
|
|
|
|
+ fail("Should throw Exception..");
|
|
|
|
+ } catch (IllegalArgumentException e) {
|
|
|
|
+ System.out.println("## " + e.getMessage());
|
|
|
|
+ assertTrue(e.getMessage().contains(
|
|
|
|
+ "target should be OPPORTUNISTIC and original should be GUARANTEED"));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ Container c = allocatedGuaranteedContainers.values().iterator().next();
|
|
|
|
+ amClient.requestContainerUpdate(
|
|
|
|
+ c, UpdateContainerRequest.newInstance(c.getVersion(),
|
|
|
|
+ c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
|
|
|
|
+ null, ExecutionType.OPPORTUNISTIC));
|
|
|
|
+ iterationsLeft = 120;
|
|
|
|
+ Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
|
|
|
|
+ // do a few iterations to ensure RM is not going to send new containers
|
|
|
|
+ while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
|
|
|
|
+ // inform RM of rejection
|
|
|
|
+ allocResponse = amClient.allocate(0.1f);
|
|
|
|
+ // RM did not send new containers because AM does not need any
|
|
|
|
+ if (allocResponse.getUpdatedContainers() != null) {
|
|
|
|
+ for (UpdatedContainer updatedContainer : allocResponse
|
|
|
|
+ .getUpdatedContainers()) {
|
|
|
|
+ System.out.println("Got update..");
|
|
|
|
+ updatedContainers.put(updatedContainer.getContainer().getId(),
|
|
|
|
+ updatedContainer);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (iterationsLeft > 0) {
|
|
|
|
+ // sleep to make sure NM's heartbeat
|
|
|
|
+ sleep(100);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ assertEquals(1, updatedContainers.size());
|
|
|
|
+
|
|
|
|
+ for (ContainerId cId : allocatedGuaranteedContainers.keySet()) {
|
|
|
|
+ Container orig = allocatedGuaranteedContainers.get(cId);
|
|
|
|
+ UpdatedContainer updatedContainer = updatedContainers.get(cId);
|
|
|
|
+ assertNotNull(updatedContainer);
|
|
|
|
+ assertEquals(ExecutionType.OPPORTUNISTIC,
|
|
|
|
+ updatedContainer.getContainer().getExecutionType());
|
|
|
|
+ assertEquals(orig.getResource(),
|
|
|
|
+ updatedContainer.getContainer().getResource());
|
|
|
|
+ assertEquals(orig.getNodeId(),
|
|
|
|
+ updatedContainer.getContainer().getNodeId());
|
|
|
|
+ assertEquals(orig.getVersion() + 1,
|
|
|
|
+ updatedContainer.getContainer().getVersion());
|
|
|
|
+ }
|
|
|
|
+ assertEquals(0, amClient.ask.size());
|
|
|
|
+ assertEquals(0, amClient.release.size());
|
|
|
|
+
|
|
|
|
+ updateContainerExecType(allocResponse, ExecutionType.OPPORTUNISTIC,
|
|
|
|
+ nmClient);
|
|
|
|
+ amClient.ask.clear();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void updateContainerExecType(AllocateResponse allocResponse,
|
|
|
|
+ ExecutionType expectedExecType, NMClientImpl nmClient)
|
|
|
|
+ throws IOException, YarnException {
|
|
|
|
+ for (UpdatedContainer updatedContainer : allocResponse
|
|
|
|
+ .getUpdatedContainers()) {
|
|
|
|
+ Container container = updatedContainer.getContainer();
|
|
|
|
+ nmClient.increaseContainerResource(container);
|
|
|
|
+ // NodeManager may still need some time to get the stable
|
|
|
|
+ // container status
|
|
|
|
+ while (true) {
|
|
|
|
+ ContainerStatus status = nmClient
|
|
|
|
+ .getContainerStatus(container.getId(), container.getNodeId());
|
|
|
|
+ if (status.getExecutionType() == expectedExecType) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ sleep(10);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void startContainer(AllocateResponse allocResponse,
|
|
|
|
+ NMClientImpl nmClient) throws IOException, YarnException {
|
|
|
|
+ // START THE CONTAINER IN NM
|
|
|
|
+ // build container launch context
|
|
|
|
+ Credentials ts = new Credentials();
|
|
|
|
+ DataOutputBuffer dob = new DataOutputBuffer();
|
|
|
|
+ ts.writeTokenStorageToStream(dob);
|
|
|
|
+ ByteBuffer securityTokens =
|
|
|
|
+ ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
|
|
|
+ // start a process long enough for increase/decrease action to take effect
|
|
|
|
+ ContainerLaunchContext clc = BuilderUtils.newContainerLaunchContext(
|
|
|
|
+ Collections.<String, LocalResource>emptyMap(),
|
|
|
|
+ new HashMap<String, String>(), Arrays.asList("sleep", "100"),
|
|
|
|
+ new HashMap<String, ByteBuffer>(), securityTokens,
|
|
|
|
+ new HashMap<ApplicationAccessType, String>());
|
|
|
|
+ // start the containers and make sure they are in RUNNING state
|
|
|
|
+ for (Container container : allocResponse.getAllocatedContainers()) {
|
|
|
|
+ nmClient.startContainer(container, clc);
|
|
|
|
+ // NodeManager may still need some time to get the stable
|
|
|
|
+ // container status
|
|
|
|
+ while (true) {
|
|
|
|
+ ContainerStatus status = nmClient
|
|
|
|
+ .getContainerStatus(container.getId(), container.getNodeId());
|
|
|
|
+ if (status.getState() == ContainerState.RUNNING) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ sleep(10);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
|
|
private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
|
|
throws YarnException, IOException {
|
|
throws YarnException, IOException {
|
|
// setup container request
|
|
// setup container request
|
|
@@ -1172,7 +1509,7 @@ public class TestAMRMClient {
|
|
Set<ContainerId> releases = new TreeSet<ContainerId>();
|
|
Set<ContainerId> releases = new TreeSet<ContainerId>();
|
|
|
|
|
|
amClient.getNMTokenCache().clearCache();
|
|
amClient.getNMTokenCache().clearCache();
|
|
- Assert.assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache());
|
|
|
|
|
|
+ assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache());
|
|
HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
|
|
HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
|
|
|
|
|
|
while (allocatedContainerCount < containersRequestedAny
|
|
while (allocatedContainerCount < containersRequestedAny
|
|
@@ -1192,7 +1529,7 @@ public class TestAMRMClient {
|
|
for (NMToken token : allocResponse.getNMTokens()) {
|
|
for (NMToken token : allocResponse.getNMTokens()) {
|
|
String nodeID = token.getNodeId().toString();
|
|
String nodeID = token.getNodeId().toString();
|
|
if (receivedNMTokens.containsKey(nodeID)) {
|
|
if (receivedNMTokens.containsKey(nodeID)) {
|
|
- Assert.fail("Received token again for : " + nodeID);
|
|
|
|
|
|
+ fail("Received token again for : " + nodeID);
|
|
}
|
|
}
|
|
receivedNMTokens.put(nodeID, token.getToken());
|
|
receivedNMTokens.put(nodeID, token.getToken());
|
|
}
|
|
}
|
|
@@ -1204,7 +1541,7 @@ public class TestAMRMClient {
|
|
}
|
|
}
|
|
|
|
|
|
// Should receive atleast 1 token
|
|
// Should receive atleast 1 token
|
|
- Assert.assertTrue(receivedNMTokens.size() > 0
|
|
|
|
|
|
+ assertTrue(receivedNMTokens.size() > 0
|
|
&& receivedNMTokens.size() <= nodeCount);
|
|
&& receivedNMTokens.size() <= nodeCount);
|
|
|
|
|
|
assertEquals(allocatedContainerCount, containersRequestedAny);
|
|
assertEquals(allocatedContainerCount, containersRequestedAny);
|
|
@@ -1444,7 +1781,7 @@ public class TestAMRMClient {
|
|
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_1 =
|
|
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_1 =
|
|
getAMRMToken();
|
|
getAMRMToken();
|
|
Assert.assertNotNull(amrmToken_1);
|
|
Assert.assertNotNull(amrmToken_1);
|
|
- Assert.assertEquals(amrmToken_1.decodeIdentifier().getKeyId(),
|
|
|
|
|
|
+ assertEquals(amrmToken_1.decodeIdentifier().getKeyId(),
|
|
amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
|
|
amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
|
|
|
|
|
|
// Wait for enough time and make sure the roll_over happens
|
|
// Wait for enough time and make sure the roll_over happens
|
|
@@ -1459,7 +1796,7 @@ public class TestAMRMClient {
|
|
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_2 =
|
|
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_2 =
|
|
getAMRMToken();
|
|
getAMRMToken();
|
|
Assert.assertNotNull(amrmToken_2);
|
|
Assert.assertNotNull(amrmToken_2);
|
|
- Assert.assertEquals(amrmToken_2.decodeIdentifier().getKeyId(),
|
|
|
|
|
|
+ assertEquals(amrmToken_2.decodeIdentifier().getKeyId(),
|
|
amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
|
|
amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
|
|
|
|
|
|
Assert.assertNotEquals(amrmToken_1, amrmToken_2);
|
|
Assert.assertNotEquals(amrmToken_1, amrmToken_2);
|
|
@@ -1474,7 +1811,7 @@ public class TestAMRMClient {
|
|
AMRMTokenIdentifierForTest newVersionTokenIdentifier =
|
|
AMRMTokenIdentifierForTest newVersionTokenIdentifier =
|
|
new AMRMTokenIdentifierForTest(amrmToken_2.decodeIdentifier(), "message");
|
|
new AMRMTokenIdentifierForTest(amrmToken_2.decodeIdentifier(), "message");
|
|
|
|
|
|
- Assert.assertEquals("Message is changed after set to newVersionTokenIdentifier",
|
|
|
|
|
|
+ assertEquals("Message is changed after set to newVersionTokenIdentifier",
|
|
"message", newVersionTokenIdentifier.getMessage());
|
|
"message", newVersionTokenIdentifier.getMessage());
|
|
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newVersionToken =
|
|
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newVersionToken =
|
|
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> (
|
|
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> (
|
|
@@ -1530,10 +1867,10 @@ public class TestAMRMClient {
|
|
.getBindAddress(), conf);
|
|
.getBindAddress(), conf);
|
|
}
|
|
}
|
|
}).allocate(Records.newRecord(AllocateRequest.class));
|
|
}).allocate(Records.newRecord(AllocateRequest.class));
|
|
- Assert.fail("The old Token should not work");
|
|
|
|
|
|
+ fail("The old Token should not work");
|
|
} catch (Exception ex) {
|
|
} catch (Exception ex) {
|
|
- Assert.assertTrue(ex instanceof InvalidToken);
|
|
|
|
- Assert.assertTrue(ex.getMessage().contains(
|
|
|
|
|
|
+ assertTrue(ex instanceof InvalidToken);
|
|
|
|
+ assertTrue(ex.getMessage().contains(
|
|
"Invalid AMRMToken from "
|
|
"Invalid AMRMToken from "
|
|
+ amrmToken_2.decodeIdentifier().getApplicationAttemptId()));
|
|
+ amrmToken_2.decodeIdentifier().getApplicationAttemptId()));
|
|
}
|
|
}
|
|
@@ -1560,7 +1897,7 @@ public class TestAMRMClient {
|
|
org.apache.hadoop.security.token.Token<?> token = iter.next();
|
|
org.apache.hadoop.security.token.Token<?> token = iter.next();
|
|
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
|
|
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
|
|
if (result != null) {
|
|
if (result != null) {
|
|
- Assert.fail("credentials has more than one AMRM token."
|
|
|
|
|
|
+ fail("credentials has more than one AMRM token."
|
|
+ " token1: " + result + " token2: " + token);
|
|
+ " token1: " + result + " token2: " + token);
|
|
}
|
|
}
|
|
result = (org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>)
|
|
result = (org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>)
|