|
@@ -47,12 +47,15 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.api.records.Token;
|
|
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
|
|
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
|
|
import org.apache.hadoop.yarn.client.api.YarnClient;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
@@ -64,10 +67,14 @@ import org.mockito.stubbing.Answer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeSet;
|
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertNotNull;
|
|
|
+import static org.junit.Assert.assertNull;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
import static org.mockito.Matchers.any;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.when;
|
|
@@ -166,8 +173,18 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
|
|
|
Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
|
|
|
Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
|
|
|
|
|
|
- RMApp rmApp =
|
|
|
- cluster.getResourceManager().getRMContext().getRMApps().get(appId);
|
|
|
+ // Wait until the RM has been updated and verify
|
|
|
+ Map<ApplicationId, RMApp> rmApps =
|
|
|
+ cluster.getResourceManager().getRMContext().getRMApps();
|
|
|
+ boolean rmUpdated = false;
|
|
|
+ for (int i=0; i<10 && !rmUpdated; i++) {
|
|
|
+ sleep(100);
|
|
|
+ RMApp rmApp = rmApps.get(appId);
|
|
|
+ if (rmApp.getState() == RMAppState.RUNNING) {
|
|
|
+ rmUpdated = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ RMApp rmApp = rmApps.get(appId);
|
|
|
Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
|
|
|
|
|
|
LOG.info("testDistributedSchedulingE2E - Allocate");
|
|
@@ -207,6 +224,17 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
|
|
|
containerTokenIdentifier.getExecutionType());
|
|
|
}
|
|
|
|
|
|
+ // Check that the RM sees OPPORTUNISTIC containers
|
|
|
+ ResourceScheduler scheduler = cluster.getResourceManager()
|
|
|
+ .getResourceScheduler();
|
|
|
+ for (Container allocatedContainer : allocResponse
|
|
|
+ .getAllocatedContainers()) {
|
|
|
+ ContainerId containerId = allocatedContainer.getId();
|
|
|
+ RMContainer rmContainer = scheduler.getRMContainer(containerId);
|
|
|
+ Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
|
|
|
+ rmContainer.getExecutionType());
|
|
|
+ }
|
|
|
+
|
|
|
LOG.info("testDistributedSchedulingE2E - Finish");
|
|
|
}
|
|
|
|
|
@@ -518,6 +546,97 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Check if an AM can ask for opportunistic containers and get them.
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testAMOpportunistic() throws Exception {
|
|
|
+ // Basic container to request
|
|
|
+ Resource capability = Resource.newInstance(1024, 1);
|
|
|
+ Priority priority = Priority.newInstance(1);
|
|
|
+
|
|
|
+ // Get the cluster topology
|
|
|
+ List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
|
|
|
+ String node = nodeReports.get(0).getNodeId().getHost();
|
|
|
+ String rack = nodeReports.get(0).getRackName();
|
|
|
+ String[] nodes = new String[]{node};
|
|
|
+ String[] racks = new String[]{rack};
|
|
|
+
|
|
|
+ // Create an AM to request resources
|
|
|
+ AMRMClient<AMRMClient.ContainerRequest> amClient = null;
|
|
|
+ try {
|
|
|
+ amClient = new AMRMClientImpl<AMRMClient.ContainerRequest>(client);
|
|
|
+ amClient.init(yarnConf);
|
|
|
+ amClient.start();
|
|
|
+ amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
|
|
|
+
|
|
|
+ // AM requests an opportunistic container
|
|
|
+ ExecutionTypeRequest execTypeRequest =
|
|
|
+ ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true);
|
|
|
+ ContainerRequest containerRequest = new AMRMClient.ContainerRequest(
|
|
|
+ capability, nodes, racks, priority, true, null, execTypeRequest);
|
|
|
+ amClient.addContainerRequest(containerRequest);
|
|
|
+
|
|
|
+ // Wait until the container is allocated
|
|
|
+ ContainerId opportunisticContainerId = null;
|
|
|
+ for (int i=0; i<10 && opportunisticContainerId == null; i++) {
|
|
|
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
|
|
|
+ List<Container> allocatedContainers =
|
|
|
+ allocResponse.getAllocatedContainers();
|
|
|
+ for (Container allocatedContainer : allocatedContainers) {
|
|
|
+ // Check that this is the container we required
|
|
|
+ assertEquals(ExecutionType.OPPORTUNISTIC,
|
|
|
+ allocatedContainer.getExecutionType());
|
|
|
+ opportunisticContainerId = allocatedContainer.getId();
|
|
|
+ }
|
|
|
+ sleep(100);
|
|
|
+ }
|
|
|
+ assertNotNull(opportunisticContainerId);
|
|
|
+
|
|
|
+ // The RM sees the container as OPPORTUNISTIC
|
|
|
+ ResourceScheduler scheduler = cluster.getResourceManager()
|
|
|
+ .getResourceScheduler();
|
|
|
+ RMContainer rmContainer = scheduler.getRMContainer(
|
|
|
+ opportunisticContainerId);
|
|
|
+ assertEquals(ExecutionType.OPPORTUNISTIC,
|
|
|
+ rmContainer.getExecutionType());
|
|
|
+
|
|
|
+ // Release the opportunistic container
|
|
|
+ amClient.releaseAssignedContainer(opportunisticContainerId);
|
|
|
+ // Wait for the release container to appear
|
|
|
+ boolean released = false;
|
|
|
+ for (int i=0; i<10 && !released; i++) {
|
|
|
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
|
|
|
+ List<ContainerStatus> completedContainers =
|
|
|
+ allocResponse.getCompletedContainersStatuses();
|
|
|
+ for (ContainerStatus completedContainer : completedContainers) {
|
|
|
+ ContainerId completedContainerId =
|
|
|
+ completedContainer.getContainerId();
|
|
|
+ assertEquals(completedContainerId, opportunisticContainerId);
|
|
|
+ released = true;
|
|
|
+ }
|
|
|
+ if (!released) {
|
|
|
+ sleep(100);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ assertTrue(released);
|
|
|
+
|
|
|
+ // The RM shouldn't see the container anymore
|
|
|
+ rmContainer = scheduler.getRMContainer(opportunisticContainerId);
|
|
|
+ assertNull(rmContainer);
|
|
|
+
|
|
|
+ // Clean the AM
|
|
|
+ amClient.unregisterApplicationMaster(
|
|
|
+ FinalApplicationStatus.SUCCEEDED, null, null);
|
|
|
+ } finally {
|
|
|
+ if (amClient != null &&
|
|
|
+ amClient.getServiceState() == Service.STATE.STARTED) {
|
|
|
+ amClient.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void sleep(int sleepTime) {
|
|
|
try {
|
|
|
Thread.sleep(sleepTime);
|