|
@@ -33,6 +33,7 @@ import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
import java.util.function.Supplier;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
@@ -51,6 +52,7 @@ import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
|
import org.apache.hadoop.service.Service.STATE;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
@@ -68,6 +70,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
@@ -96,6 +99,7 @@ import org.eclipse.jetty.util.log.Log;
|
|
|
@RunWith(value = Parameterized.class)
|
|
|
public class TestAMRMClient {
|
|
|
private String schedulerName = null;
|
|
|
+ private boolean autoUpdate = false;
|
|
|
private Configuration conf = null;
|
|
|
private MiniYARNCluster yarnCluster = null;
|
|
|
private YarnClient yarnClient = null;
|
|
@@ -115,16 +119,19 @@ public class TestAMRMClient {
|
|
|
private String[] racks;
|
|
|
private final static int DEFAULT_ITERATION = 3;
|
|
|
|
|
|
- public TestAMRMClient(String schedulerName) {
|
|
|
+ public TestAMRMClient(String schedulerName, boolean autoUpdate) {
|
|
|
this.schedulerName = schedulerName;
|
|
|
+ this.autoUpdate = autoUpdate;
|
|
|
}
|
|
|
|
|
|
@Parameterized.Parameters
|
|
|
public static Collection<Object[]> data() {
|
|
|
- List<Object[]> list = new ArrayList<Object[]>(2);
|
|
|
- list.add(new Object[] {CapacityScheduler.class.getName()});
|
|
|
- list.add(new Object[] {FairScheduler.class.getName()});
|
|
|
- return list;
|
|
|
+ // Currently only capacity scheduler supports auto update.
|
|
|
+ return Arrays.asList(new Object[][] {
|
|
|
+ {CapacityScheduler.class.getName(), true},
|
|
|
+ {CapacityScheduler.class.getName(), false},
|
|
|
+ {FairScheduler.class.getName(), false}
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
@Before
|
|
@@ -135,6 +142,9 @@ public class TestAMRMClient {
|
|
|
|
|
|
private void createClusterAndStartApplication() throws Exception {
|
|
|
// start minicluster
|
|
|
+ if (autoUpdate) {
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true);
|
|
|
+ }
|
|
|
conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName);
|
|
|
conf.setLong(
|
|
|
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
|
|
@@ -1150,6 +1160,139 @@ public class TestAMRMClient {
|
|
|
assertEquals(1, updatedContainers.size());
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testAMRMContainerPromotionAndDemotionWithAutoUpdate()
|
|
|
+ throws Exception {
|
|
|
+ AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
|
|
|
+ (AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient
|
|
|
+ .createAMRMClient();
|
|
|
+ amClient.init(conf);
|
|
|
+ amClient.start();
|
|
|
+
|
|
|
+ // start am nm client
|
|
|
+ NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
|
|
|
+ Assert.assertNotNull(nmClient);
|
|
|
+ nmClient.init(conf);
|
|
|
+ nmClient.start();
|
|
|
+ assertEquals(STATE.STARTED, nmClient.getServiceState());
|
|
|
+
|
|
|
+ amClient.registerApplicationMaster("Host", 10000, "");
|
|
|
+
|
|
|
+ // setup container request
|
|
|
+ assertEquals(0, amClient.ask.size());
|
|
|
+ assertEquals(0, amClient.release.size());
|
|
|
+
|
|
|
+ // START OPPORTUNISTIC Container, Send allocation request to RM
|
|
|
+ Resource reqResource = Resource.newInstance(512, 1);
|
|
|
+ amClient.addContainerRequest(
|
|
|
+ new AMRMClient.ContainerRequest(reqResource, null, null, priority2, 0,
|
|
|
+ true, null, ExecutionTypeRequest
|
|
|
+ .newInstance(ExecutionType.OPPORTUNISTIC, true)));
|
|
|
+
|
|
|
+ // RM should allocate container within 1 calls to allocate()
|
|
|
+ AllocateResponse allocResponse = waitForAllocation(amClient, 1, 0);
|
|
|
+
|
|
|
+ assertEquals(1, allocResponse.getAllocatedContainers().size());
|
|
|
+ startContainer(allocResponse, nmClient);
|
|
|
+
|
|
|
+ Container c = allocResponse.getAllocatedContainers().get(0);
|
|
|
+ amClient.requestContainerUpdate(c,
|
|
|
+ UpdateContainerRequest.newInstance(c.getVersion(),
|
|
|
+ c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
|
|
|
+ null, ExecutionType.GUARANTEED));
|
|
|
+
|
|
|
+ allocResponse = waitForAllocation(amClient, 0, 1);
|
|
|
+
|
|
|
+ // Make sure container is updated.
|
|
|
+ UpdatedContainer updatedContainer = allocResponse
|
|
|
+ .getUpdatedContainers().get(0);
|
|
|
+
|
|
|
+ // If container auto update is not enabled, we need to notify
|
|
|
+ // NM about this update.
|
|
|
+ if (!autoUpdate) {
|
|
|
+ nmClient.updateContainerResource(updatedContainer.getContainer());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait until NM context updated, or fail on timeout.
|
|
|
+ waitForNMContextUpdate(updatedContainer, ExecutionType.GUARANTEED);
|
|
|
+
|
|
|
+ // Once promoted, demote it back to OPPORTUNISTIC
|
|
|
+ amClient.requestContainerUpdate(updatedContainer.getContainer(),
|
|
|
+ UpdateContainerRequest.newInstance(
|
|
|
+ updatedContainer.getContainer().getVersion(),
|
|
|
+ updatedContainer.getContainer().getId(),
|
|
|
+ ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
|
|
|
+ null, ExecutionType.OPPORTUNISTIC));
|
|
|
+
|
|
|
+ allocResponse = waitForAllocation(amClient, 0, 1);
|
|
|
+
|
|
|
+ // Make sure container is updated.
|
|
|
+ updatedContainer = allocResponse.getUpdatedContainers().get(0);
|
|
|
+
|
|
|
+ if (!autoUpdate) {
|
|
|
+ nmClient.updateContainerResource(updatedContainer.getContainer());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait until NM context updated, or fail on timeout.
|
|
|
+ waitForNMContextUpdate(updatedContainer, ExecutionType.OPPORTUNISTIC);
|
|
|
+
|
|
|
+ amClient.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private AllocateResponse waitForAllocation(AMRMClient amrmClient,
|
|
|
+ int expectedAllocatedContainerNum, int expectedUpdatedContainerNum)
|
|
|
+ throws Exception {
|
|
|
+ AllocateResponse allocResponse = null;
|
|
|
+ int iteration = 100;
|
|
|
+ while(iteration>0) {
|
|
|
+ allocResponse = amrmClient.allocate(0.1f);
|
|
|
+ int actualAllocated = allocResponse.getAllocatedContainers().size();
|
|
|
+ int actualUpdated = allocResponse.getUpdatedContainers().size();
|
|
|
+ if (expectedAllocatedContainerNum == actualAllocated &&
|
|
|
+ expectedUpdatedContainerNum == actualUpdated) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ Thread.sleep(100);
|
|
|
+ iteration--;
|
|
|
+ }
|
|
|
+ return allocResponse;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void waitForNMContextUpdate(UpdatedContainer updatedContainer,
|
|
|
+ ExecutionType expectedType) {
|
|
|
+ for (int i=0; i<nodeCount; i++) {
|
|
|
+ NodeManager nm = yarnCluster.getNodeManager(i);
|
|
|
+ if (nm.getNMContext().getNodeId()
|
|
|
+ .equals(updatedContainer.getContainer().getNodeId())) {
|
|
|
+ try {
|
|
|
+ GenericTestUtils.waitFor(() -> {
|
|
|
+ org.apache.hadoop.yarn.server.nodemanager.containermanager
|
|
|
+ .container.Container nmContainer =
|
|
|
+ nm.getNMContext().getContainers()
|
|
|
+ .get(updatedContainer.getContainer().getId());
|
|
|
+ if (nmContainer != null) {
|
|
|
+ ExecutionType actual = nmContainer.getContainerTokenIdentifier()
|
|
|
+ .getExecutionType();
|
|
|
+ return actual.equals(expectedType);
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }, 1000, 30000);
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ fail("Times out waiting for container state in"
|
|
|
+ + " NM context to be updated");
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // Ignorable.
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Iterated all nodes but still can't get a match
|
|
|
+ if (i == nodeCount -1) {
|
|
|
+ fail("Container doesn't exist in NM context.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test(timeout=60000)
|
|
|
public void testAMRMClientWithContainerPromotion()
|
|
|
throws YarnException, IOException {
|
|
@@ -1435,7 +1578,9 @@ public class TestAMRMClient {
|
|
|
for (UpdatedContainer updatedContainer : allocResponse
|
|
|
.getUpdatedContainers()) {
|
|
|
Container container = updatedContainer.getContainer();
|
|
|
- nmClient.increaseContainerResource(container);
|
|
|
+ if (!autoUpdate) {
|
|
|
+ nmClient.increaseContainerResource(container);
|
|
|
+ }
|
|
|
// NodeManager may still need some time to get the stable
|
|
|
// container status
|
|
|
while (true) {
|