|
@@ -956,7 +956,6 @@ public class TestAMRMClient extends BaseAMRMClientTest{
|
|
|
return containers;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
private void doContainerResourceChange(
|
|
|
final AMRMClient<ContainerRequest> amClient, List<Container> containers)
|
|
|
throws YarnException, IOException {
|
|
@@ -986,38 +985,50 @@ public class TestAMRMClient extends BaseAMRMClientTest{
|
|
|
Resource.newInstance(512, 1), null));
|
|
|
assertEquals(Resource.newInstance(512, 1),
|
|
|
amClientImpl.change.get(container1.getId()).getValue().getCapability());
|
|
|
- // request resource increase for container2
|
|
|
- amClientImpl.requestContainerUpdate(container2,
|
|
|
- UpdateContainerRequest.newInstance(container2.getVersion(),
|
|
|
- container2.getId(), ContainerUpdateType.INCREASE_RESOURCE,
|
|
|
- Resource.newInstance(2048, 1), null));
|
|
|
- assertEquals(Resource.newInstance(2048, 1),
|
|
|
- amClientImpl.change.get(container2.getId()).getValue().getCapability());
|
|
|
// verify release request will cancel pending change requests for the same
|
|
|
// container
|
|
|
amClientImpl.requestContainerUpdate(container3,
|
|
|
UpdateContainerRequest.newInstance(container3.getVersion(),
|
|
|
container3.getId(), ContainerUpdateType.INCREASE_RESOURCE,
|
|
|
Resource.newInstance(2048, 1), null));
|
|
|
- assertEquals(3, amClientImpl.pendingChange.size());
|
|
|
- amClientImpl.releaseAssignedContainer(container3.getId());
|
|
|
assertEquals(2, amClientImpl.pendingChange.size());
|
|
|
+ amClientImpl.releaseAssignedContainer(container3.getId());
|
|
|
+ assertEquals(1, amClientImpl.pendingChange.size());
|
|
|
// as of now: container1 asks to decrease to (512, 1)
|
|
|
// container2 asks to increase to (2048, 1)
|
|
|
// send allocation requests
|
|
|
- AllocateResponse allocResponse = amClient.allocate(0.1f);
|
|
|
- assertEquals(0, amClientImpl.change.size());
|
|
|
- // we should get decrease confirmation right away
|
|
|
- List<UpdatedContainer> updatedContainers =
|
|
|
- allocResponse.getUpdatedContainers();
|
|
|
- assertEquals(1, updatedContainers.size());
|
|
|
// we should get increase allocation after the next NM's heartbeat to RM
|
|
|
- triggerSchedulingWithNMHeartBeat();
|
|
|
- // get allocations
|
|
|
- allocResponse = amClient.allocate(0.1f);
|
|
|
- updatedContainers =
|
|
|
- allocResponse.getUpdatedContainers();
|
|
|
- assertEquals(1, updatedContainers.size());
|
|
|
+ assertUpdatedContainers(amClient, container1);
|
|
|
+ // request resource increase for container2
|
|
|
+ amClientImpl.requestContainerUpdate(container2,
|
|
|
+ UpdateContainerRequest.newInstance(container2.getVersion(),
|
|
|
+ container2.getId(), ContainerUpdateType.INCREASE_RESOURCE,
|
|
|
+ Resource.newInstance(2048, 1), null));
|
|
|
+ assertEquals(Resource.newInstance(2048, 1),
|
|
|
+ amClientImpl.change.get(container2.getId()).getValue().getCapability());
|
|
|
+ assertUpdatedContainers(amClient, container2);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void assertUpdatedContainers(AMRMClient<ContainerRequest> amClient, Container container) {
|
|
|
+ RMContext context = yarnCluster.getResourceManager().getRMContext();
|
|
|
+ RMNode rmNode = context.getRMNodes().get(container.getNodeId());
|
|
|
+ List<UpdatedContainer> updateResponse = new ArrayList<>();
|
|
|
+ int allocationAttempts = 0;
|
|
|
+ while (allocationAttempts < 1000) {
|
|
|
+ context.getScheduler().handle(new NodeUpdateSchedulerEvent(rmNode));
|
|
|
+ try {
|
|
|
+ updateResponse = amClient.allocate(0.1f).getUpdatedContainers();
|
|
|
+ if (updateResponse.size() == 1) {
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ allocationAttempts++;
|
|
|
+ sleep(20);
|
|
|
+ }
|
|
|
+ } catch (Exception ignored) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Assert.assertEquals("Container resource change update failed", 1, updateResponse.size());
|
|
|
}
|
|
|
|
|
|
@Test
|