|
@@ -2617,15 +2617,78 @@ public class TestRMContainerAllocator {
|
|
|
new Text(rmAddr), ugiToken.getService());
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testConcurrentTaskLimitsDisabledIfSmaller() throws Exception {
|
|
|
+ final int MAP_COUNT = 1;
|
|
|
+ final int REDUCE_COUNT = 1;
|
|
|
+ final int MAP_LIMIT = 1;
|
|
|
+ final int REDUCE_LIMIT = 1;
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
|
|
|
+ conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
|
|
|
+ conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
|
|
|
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
|
|
|
+ ApplicationAttemptId appAttemptId =
|
|
|
+ ApplicationAttemptId.newInstance(appId, 1);
|
|
|
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
|
|
|
+ Job mockJob = mock(Job.class);
|
|
|
+ when(mockJob.getReport()).thenReturn(
|
|
|
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
|
|
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
|
|
+ when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT);
|
|
|
+ when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT);
|
|
|
+
|
|
|
+ final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
|
|
|
+ MyContainerAllocator allocator =
|
|
|
+ new MyContainerAllocator(null, conf, appAttemptId, mockJob,
|
|
|
+ new SystemClock()) {
|
|
|
+ @Override
|
|
|
+ protected void register() {
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected ApplicationMasterProtocol createSchedulerProxy() {
|
|
|
+ return mockScheduler;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void setRequestLimit(Priority priority,
|
|
|
+ Resource capability, int limit) {
|
|
|
+ Assert.fail("setRequestLimit() should not be invoked");
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // create some map requests
|
|
|
+ ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
|
|
|
+ for (int i = 0; i < reqMapEvents.length; ++i) {
|
|
|
+ reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
|
|
|
+ }
|
|
|
+ allocator.sendRequests(Arrays.asList(reqMapEvents));
|
|
|
+ // create some reduce requests
|
|
|
+ ContainerRequestEvent[] reqReduceEvents =
|
|
|
+ new ContainerRequestEvent[REDUCE_COUNT];
|
|
|
+ for (int i = 0; i < reqReduceEvents.length; ++i) {
|
|
|
+ reqReduceEvents[i] =
|
|
|
+ createReq(jobId, i, 1024, new String[] {}, false, true);
|
|
|
+ }
|
|
|
+ allocator.sendRequests(Arrays.asList(reqReduceEvents));
|
|
|
+ allocator.schedule();
|
|
|
+ allocator.schedule();
|
|
|
+ allocator.schedule();
|
|
|
+ allocator.close();
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testConcurrentTaskLimits() throws Exception {
|
|
|
+ final int MAP_COUNT = 5;
|
|
|
+ final int REDUCE_COUNT = 2;
|
|
|
final int MAP_LIMIT = 3;
|
|
|
final int REDUCE_LIMIT = 1;
|
|
|
LOG.info("Running testConcurrentTaskLimits");
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
|
|
|
conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
|
|
|
- conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
|
|
|
+ conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f);
|
|
|
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
|
|
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
|
|
|
appId, 1);
|
|
@@ -2634,6 +2697,9 @@ public class TestRMContainerAllocator {
|
|
|
when(mockJob.getReport()).thenReturn(
|
|
|
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
|
|
|
0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
|
|
|
+ when(mockJob.getTotalMaps()).thenReturn(MAP_COUNT);
|
|
|
+ when(mockJob.getTotalReduces()).thenReturn(REDUCE_COUNT);
|
|
|
+
|
|
|
final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
|
|
|
MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
|
|
|
appAttemptId, mockJob, new SystemClock()) {
|
|
@@ -2648,14 +2714,13 @@ public class TestRMContainerAllocator {
|
|
|
};
|
|
|
|
|
|
// create some map requests
|
|
|
- ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[5];
|
|
|
+ ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
|
|
|
for (int i = 0; i < reqMapEvents.length; ++i) {
|
|
|
reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
|
|
|
}
|
|
|
allocator.sendRequests(Arrays.asList(reqMapEvents));
|
|
|
-
|
|
|
// create some reduce requests
|
|
|
- ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[2];
|
|
|
+ ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[REDUCE_COUNT];
|
|
|
for (int i = 0; i < reqReduceEvents.length; ++i) {
|
|
|
reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {},
|
|
|
false, true);
|