|
@@ -31,9 +31,11 @@ import static org.mockito.Mockito.verify;
|
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
@@ -80,7 +82,13 @@ import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
@@ -88,6 +96,10 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
+import org.apache.hadoop.yarn.api.records.NMToken;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeReport;
|
|
|
+import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
@@ -2382,6 +2394,208 @@ public class TestRMContainerAllocator {
|
|
|
new Text(rmAddr), ugiToken.getService());
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testConcurrentTaskLimits() throws Exception {
|
|
|
+ final int MAP_LIMIT = 3;
|
|
|
+ final int REDUCE_LIMIT = 1;
|
|
|
+ LOG.info("Running testConcurrentTaskLimits");
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
|
|
|
+ conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
|
|
|
+ conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
|
|
|
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
|
|
|
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
|
|
|
+ appId, 1);
|
|
|
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
|
|
|
+ Job mockJob = mock(Job.class);
|
|
|
+ when(mockJob.getReport()).thenReturn(
|
|
|
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
|
|
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
|
|
+ final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
|
|
|
+ MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
|
|
|
+ appAttemptId, mockJob) {
|
|
|
+ @Override
|
|
|
+ protected void register() {
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected ApplicationMasterProtocol createSchedulerProxy() {
|
|
|
+ return mockScheduler;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // create some map requests
|
|
|
+ ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[5];
|
|
|
+ for (int i = 0; i < reqMapEvents.length; ++i) {
|
|
|
+ reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
|
|
|
+ }
|
|
|
+ allocator.sendRequests(Arrays.asList(reqMapEvents));
|
|
|
+
|
|
|
+ // create some reduce requests
|
|
|
+ ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[2];
|
|
|
+ for (int i = 0; i < reqReduceEvents.length; ++i) {
|
|
|
+ reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {},
|
|
|
+ false, true);
|
|
|
+ }
|
|
|
+ allocator.sendRequests(Arrays.asList(reqReduceEvents));
|
|
|
+ allocator.schedule();
|
|
|
+
|
|
|
+ // verify all of the host-specific asks were sent plus one for the
|
|
|
+ // default rack and one for the ANY request
|
|
|
+ Assert.assertEquals(reqMapEvents.length + 2, mockScheduler.lastAsk.size());
|
|
|
+
|
|
|
+ // verify AM is only asking for the map limit overall
|
|
|
+ Assert.assertEquals(MAP_LIMIT, mockScheduler.lastAnyAskMap);
|
|
|
+
|
|
|
+ // assign a map task and verify we do not ask for any more maps
|
|
|
+ ContainerId cid0 = mockScheduler.assignContainer("h0", false);
|
|
|
+ allocator.schedule();
|
|
|
+ allocator.schedule();
|
|
|
+ Assert.assertEquals(2, mockScheduler.lastAnyAskMap);
|
|
|
+
|
|
|
+ // complete the map task and verify that we ask for one more
|
|
|
+ mockScheduler.completeContainer(cid0);
|
|
|
+ allocator.schedule();
|
|
|
+ allocator.schedule();
|
|
|
+ Assert.assertEquals(3, mockScheduler.lastAnyAskMap);
|
|
|
+
|
|
|
+ // assign three more maps and verify we ask for no more maps
|
|
|
+ ContainerId cid1 = mockScheduler.assignContainer("h1", false);
|
|
|
+ ContainerId cid2 = mockScheduler.assignContainer("h2", false);
|
|
|
+ ContainerId cid3 = mockScheduler.assignContainer("h3", false);
|
|
|
+ allocator.schedule();
|
|
|
+ allocator.schedule();
|
|
|
+ Assert.assertEquals(0, mockScheduler.lastAnyAskMap);
|
|
|
+
|
|
|
+ // complete two containers and verify we only asked for one more
|
|
|
+ // since at that point all maps should be scheduled/completed
|
|
|
+ mockScheduler.completeContainer(cid2);
|
|
|
+ mockScheduler.completeContainer(cid3);
|
|
|
+ allocator.schedule();
|
|
|
+ allocator.schedule();
|
|
|
+ Assert.assertEquals(1, mockScheduler.lastAnyAskMap);
|
|
|
+
|
|
|
+ // allocate the last container and complete the first one
|
|
|
+ // and verify there are no more map asks.
|
|
|
+ mockScheduler.completeContainer(cid1);
|
|
|
+ ContainerId cid4 = mockScheduler.assignContainer("h4", false);
|
|
|
+ allocator.schedule();
|
|
|
+ allocator.schedule();
|
|
|
+ Assert.assertEquals(0, mockScheduler.lastAnyAskMap);
|
|
|
+
|
|
|
+ // complete the last map
|
|
|
+ mockScheduler.completeContainer(cid4);
|
|
|
+ allocator.schedule();
|
|
|
+ allocator.schedule();
|
|
|
+ Assert.assertEquals(0, mockScheduler.lastAnyAskMap);
|
|
|
+
|
|
|
+ // verify only reduce limit being requested
|
|
|
+ Assert.assertEquals(REDUCE_LIMIT, mockScheduler.lastAnyAskReduce);
|
|
|
+
|
|
|
+ // assign a reducer and verify ask goes to zero
|
|
|
+ cid0 = mockScheduler.assignContainer("h0", true);
|
|
|
+ allocator.schedule();
|
|
|
+ allocator.schedule();
|
|
|
+ Assert.assertEquals(0, mockScheduler.lastAnyAskReduce);
|
|
|
+
|
|
|
+ // complete the reducer and verify we ask for another
|
|
|
+ mockScheduler.completeContainer(cid0);
|
|
|
+ allocator.schedule();
|
|
|
+ allocator.schedule();
|
|
|
+ Assert.assertEquals(1, mockScheduler.lastAnyAskReduce);
|
|
|
+
|
|
|
+ // assign a reducer and verify ask goes to zero
|
|
|
+ cid0 = mockScheduler.assignContainer("h0", true);
|
|
|
+ allocator.schedule();
|
|
|
+ allocator.schedule();
|
|
|
+ Assert.assertEquals(0, mockScheduler.lastAnyAskReduce);
|
|
|
+
|
|
|
+ // complete the reducer and verify no more reducers
|
|
|
+ mockScheduler.completeContainer(cid0);
|
|
|
+ allocator.schedule();
|
|
|
+ allocator.schedule();
|
|
|
+ Assert.assertEquals(0, mockScheduler.lastAnyAskReduce);
|
|
|
+ allocator.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class MockScheduler implements ApplicationMasterProtocol {
|
|
|
+ ApplicationAttemptId attemptId;
|
|
|
+ long nextContainerId = 10;
|
|
|
+ List<ResourceRequest> lastAsk = null;
|
|
|
+ int lastAnyAskMap = 0;
|
|
|
+ int lastAnyAskReduce = 0;
|
|
|
+ List<ContainerStatus> containersToComplete =
|
|
|
+ new ArrayList<ContainerStatus>();
|
|
|
+ List<Container> containersToAllocate = new ArrayList<Container>();
|
|
|
+
|
|
|
+ public MockScheduler(ApplicationAttemptId attemptId) {
|
|
|
+ this.attemptId = attemptId;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public RegisterApplicationMasterResponse registerApplicationMaster(
|
|
|
+ RegisterApplicationMasterRequest request) throws YarnException,
|
|
|
+ IOException {
|
|
|
+ return RegisterApplicationMasterResponse.newInstance(
|
|
|
+ Resource.newInstance(512, 1),
|
|
|
+ Resource.newInstance(512000, 1024),
|
|
|
+ Collections.<ApplicationAccessType,String>emptyMap(),
|
|
|
+ ByteBuffer.wrap("fake_key".getBytes()),
|
|
|
+ Collections.<Container>emptyList(),
|
|
|
+ "default",
|
|
|
+ Collections.<NMToken>emptyList());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public FinishApplicationMasterResponse finishApplicationMaster(
|
|
|
+ FinishApplicationMasterRequest request) throws YarnException,
|
|
|
+ IOException {
|
|
|
+ return FinishApplicationMasterResponse.newInstance(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public AllocateResponse allocate(AllocateRequest request)
|
|
|
+ throws YarnException, IOException {
|
|
|
+ lastAsk = request.getAskList();
|
|
|
+ for (ResourceRequest req : lastAsk) {
|
|
|
+ if (ResourceRequest.ANY.equals(req.getResourceName())) {
|
|
|
+ Priority priority = req.getPriority();
|
|
|
+ if (priority.equals(RMContainerAllocator.PRIORITY_MAP)) {
|
|
|
+ lastAnyAskMap = req.getNumContainers();
|
|
|
+ } else if (priority.equals(RMContainerAllocator.PRIORITY_REDUCE)){
|
|
|
+ lastAnyAskReduce = req.getNumContainers();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ AllocateResponse response = AllocateResponse.newInstance(
|
|
|
+ request.getResponseId(),
|
|
|
+ containersToComplete, containersToAllocate,
|
|
|
+ Collections.<NodeReport>emptyList(),
|
|
|
+ Resource.newInstance(512000, 1024), null, 10, null,
|
|
|
+ Collections.<NMToken>emptyList());
|
|
|
+ containersToComplete.clear();
|
|
|
+ containersToAllocate.clear();
|
|
|
+ return response;
|
|
|
+ }
|
|
|
+
|
|
|
+ public ContainerId assignContainer(String nodeName, boolean isReduce) {
|
|
|
+ ContainerId containerId =
|
|
|
+ ContainerId.newContainerId(attemptId, nextContainerId++);
|
|
|
+ Priority priority = isReduce ? RMContainerAllocator.PRIORITY_REDUCE
|
|
|
+ : RMContainerAllocator.PRIORITY_MAP;
|
|
|
+ Container container = Container.newInstance(containerId,
|
|
|
+ NodeId.newInstance(nodeName, 1234), nodeName + ":5678",
|
|
|
+ Resource.newInstance(1024, 1), priority, null);
|
|
|
+ containersToAllocate.add(container);
|
|
|
+ return containerId;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void completeContainer(ContainerId containerId) {
|
|
|
+ containersToComplete.add(ContainerStatus.newInstance(containerId,
|
|
|
+ ContainerState.COMPLETE, "", 0));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
TestRMContainerAllocator t = new TestRMContainerAllocator();
|
|
|
t.testSimple();
|