|
@@ -36,7 +36,6 @@ import org.apache.commons.logging.LogFactory;
|
|
|
|
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
|
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
|
|
|
-import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
@@ -864,83 +863,45 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Test the max map limit.
|
|
|
+ * Test the max Capacity for map and reduce
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public void testMaxMapCap() throws IOException {
|
|
|
+ public void testMaxCapacities() throws IOException {
|
|
|
this.setUp(4,1,1);
|
|
|
taskTrackerManager.addQueues(new String[] {"default"});
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
- queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
|
|
|
- resConf.setFakeQueues(queues);
|
|
|
- resConf.setMaxMapCap("default",2);
|
|
|
- resConf.setMaxReduceCap("default",-1);
|
|
|
- scheduler.setResourceManagerConf(resConf);
|
|
|
- scheduler.start();
|
|
|
-
|
|
|
- //submit the Job
|
|
|
- FakeJobInProgress fjob1 =
|
|
|
- submitJobAndInit(JobStatus.PREP,3,1,"default","user");
|
|
|
-
|
|
|
- List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
|
|
|
- List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
|
|
|
-
|
|
|
- //Once the 2 tasks are running the third assigment should be reduce.
|
|
|
- checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
|
|
|
- //This should fail.
|
|
|
- List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
|
|
|
- assertNull(task4);
|
|
|
- //Now complete the task 1.
|
|
|
- // complete the job
|
|
|
- taskTrackerManager.finishTask("tt1", task1.get(0).getTaskID().toString(),
|
|
|
- fjob1);
|
|
|
- //We have completed the tt1 task which was a map task so we expect one map
|
|
|
- //task to be picked up
|
|
|
- checkAssignment("tt4","attempt_test_0001_m_000003_0 on tt4");
|
|
|
- }
|
|
|
+ queues.add(new FakeQueueInfo("default", 25.0f, false, 1));
|
|
|
|
|
|
- /**
|
|
|
- * Test max reduce limit
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public void testMaxReduceCap() throws IOException {
|
|
|
- this.setUp(4, 1, 1);
|
|
|
- taskTrackerManager.addQueues(new String[]{"default"});
|
|
|
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
- queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
|
|
|
resConf.setFakeQueues(queues);
|
|
|
- resConf.setMaxMapCap("default", -1);
|
|
|
- resConf.setMaxReduceCap("default", 2);
|
|
|
+ resConf.setMaxCapacity("default", 50.0f);
|
|
|
scheduler.setResourceManagerConf(resConf);
|
|
|
+ scheduler.setAssignMultipleTasks(true);
|
|
|
scheduler.start();
|
|
|
|
|
|
//submit the Job
|
|
|
FakeJobInProgress fjob1 =
|
|
|
- submitJobAndInit(JobStatus.PREP, 1, 3, "default", "user");
|
|
|
+ submitJobAndInit(JobStatus.PREP, 4, 4, "default", "user");
|
|
|
|
|
|
- List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
|
|
|
- List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
|
|
|
- List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
|
|
|
+ //default queue has min capacity of 1 and max capacity of 2
|
|
|
|
|
|
- //This should fail. 1 map, 2 reduces , we have reached the limit.
|
|
|
- List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
|
|
|
- assertNull(task4);
|
|
|
- //Now complete the task 1 i.e map task.
|
|
|
- // complete the job
|
|
|
- taskTrackerManager.finishTask(
|
|
|
- "tt1", task1.get(0).getTaskID().toString(),
|
|
|
- fjob1);
|
|
|
+ //first call of assign task should give task from default queue.
|
|
|
+ //default uses 1 map and 1 reduce slots are used
|
|
|
+ checkMultipleAssignment(
|
|
|
+ "tt1", "attempt_test_0001_m_000001_0 on tt1",
|
|
|
+ "attempt_test_0001_r_000001_0 on tt1");
|
|
|
|
|
|
- //This should still fail as only map task is done
|
|
|
- task4 = scheduler.assignTasks(tracker("tt4"));
|
|
|
- assertNull(task4);
|
|
|
+ //second call of assign task
|
|
|
+ //default uses 2 map and 2 reduce slots
|
|
|
+ checkMultipleAssignment(
|
|
|
+ "tt2", "attempt_test_0001_m_000002_0 on tt2",
|
|
|
+ "attempt_test_0001_r_000002_0 on tt2");
|
|
|
|
|
|
- //Complete the reduce task
|
|
|
- taskTrackerManager.finishTask(
|
|
|
- "tt2", task2.get(0).getTaskID().toString(), fjob1);
|
|
|
|
|
|
- //One reduce is done hence assign the new reduce.
|
|
|
- checkAssignment("tt4","attempt_test_0001_r_000003_0 on tt4");
|
|
|
+ //Now we have reached the max capacity limit for default ,
|
|
|
+ //no further tasks would be assigned to this queue.
|
|
|
+ checkMultipleAssignment(
|
|
|
+ "tt3", null,
|
|
|
+ null);
|
|
|
}
|
|
|
|
|
|
// test if the queue reflects the changes
|
|
@@ -1295,6 +1256,27 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
assertEquals(18.75f, resConf.getCapacity("q4"));
|
|
|
}
|
|
|
|
|
|
+ public void testCapacityAllocFailureWithLowerMaxCapacity()
|
|
|
+ throws Exception {
|
|
|
+ String[] qs = {"default", "q1"};
|
|
|
+ taskTrackerManager.addQueues(qs);
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
|
|
|
+ queues.add(new FakeQueueInfo("q1", -1.0f, true, 50));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ resConf.setMaxCapacity("q1", 40.0f);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ try {
|
|
|
+ scheduler.start();
|
|
|
+ fail("Scheduler start should fail ");
|
|
|
+ } catch (IllegalStateException ise) {
|
|
|
+ assertEquals(
|
|
|
+ ise.getMessage(),
|
|
|
+ " Allocated capacity of " + 50.0f + " to unconfigured queue " +
|
|
|
+ "q1" + " is greater than maximum Capacity " + 40.0f);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// Tests how capacity is computed and assignment of tasks done
|
|
|
// on the basis of the capacity.
|
|
|
public void testCapacityBasedAllocation() throws Exception {
|
|
@@ -1377,26 +1359,26 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Creates a queue with max task limit of 2
|
|
|
+ * Creates a queue with max capacity of 50%
|
|
|
* submit 1 job in the queue which is high ram(2 slots) . As 2 slots are
|
|
|
* given to high ram job and are reserved , no other tasks are accepted .
|
|
|
*
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public void testHighMemoryBlockingWithMaxLimit()
|
|
|
+ public void testHighMemoryBlockingWithMaxCapacity()
|
|
|
throws IOException {
|
|
|
|
|
|
- // 2 map and 1 reduce slots
|
|
|
- taskTrackerManager = new FakeTaskTrackerManager(2, 2, 1);
|
|
|
+ taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
|
|
|
|
|
|
taskTrackerManager.addQueues(new String[] { "defaultXYZ" });
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
- queues.add(new FakeQueueInfo("defaultXYZ", 100.0f, true, 25));
|
|
|
+ queues.add(new FakeQueueInfo("defaultXYZ", 25.0f, true, 50));
|
|
|
resConf.setFakeQueues(queues);
|
|
|
- resConf.setMaxMapCap("defaultXYZ",2);
|
|
|
+
|
|
|
+ //defaultXYZ can go up to 2 map and 2 reduce slots
|
|
|
+ resConf.setMaxCapacity("defaultXYZ", 50.0f);
|
|
|
+
|
|
|
scheduler.setTaskTrackerManager(taskTrackerManager);
|
|
|
- // enabled memory-based scheduling
|
|
|
- // Normal job in the cluster would be 1GB maps/reduces
|
|
|
scheduler.getConf().setLong(
|
|
|
JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
|
|
|
2 * 1024);
|
|
@@ -1404,74 +1386,92 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
|
|
|
scheduler.getConf().setLong(
|
|
|
JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
|
|
|
- 1 * 1024);
|
|
|
+ 2 * 1024);
|
|
|
scheduler.getConf().setLong(
|
|
|
JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
|
|
|
scheduler.setResourceManagerConf(resConf);
|
|
|
scheduler.start();
|
|
|
+ scheduler.setAssignMultipleTasks(true);
|
|
|
|
|
|
- // The situation : Submit 2 jobs with high memory map task
|
|
|
- //Set the max limit for queue to 2 ,
|
|
|
- // try submitting more map tasks to the queue , it should not happen
|
|
|
-
|
|
|
- LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
|
|
|
- + "2 map tasks");
|
|
|
JobConf jConf = new JobConf(conf);
|
|
|
jConf.setMemoryForMapTask(2 * 1024);
|
|
|
- jConf.setMemoryForReduceTask(0);
|
|
|
+ jConf.setMemoryForReduceTask(1 * 1024);
|
|
|
jConf.setNumMapTasks(2);
|
|
|
- jConf.setNumReduceTasks(0);
|
|
|
+ jConf.setNumReduceTasks(1);
|
|
|
jConf.setQueueName("defaultXYZ");
|
|
|
jConf.setUser("u1");
|
|
|
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
|
|
|
|
|
|
- LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
|
|
|
- + "2 map/red tasks");
|
|
|
jConf = new JobConf(conf);
|
|
|
jConf.setMemoryForMapTask(1 * 1024);
|
|
|
- jConf.setMemoryForReduceTask(1 * 1024);
|
|
|
- jConf.setNumMapTasks(2);
|
|
|
+ jConf.setMemoryForReduceTask(2 * 1024);
|
|
|
+ jConf.setNumMapTasks(1);
|
|
|
jConf.setNumReduceTasks(2);
|
|
|
jConf.setQueueName("defaultXYZ");
|
|
|
jConf.setUser("u1");
|
|
|
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
|
|
|
|
|
|
- // first, a map from j1 will run this is a high memory job so it would
|
|
|
- // occupy the 2 slots
|
|
|
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
|
|
|
- checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
|
|
|
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
|
|
|
+ //high ram map from job 1 and normal reduce task from job 1
|
|
|
+ List<Task> tasks = checkMultipleAssignment(
|
|
|
+ "tt1", "attempt_test_0001_m_000001_0 on tt1",
|
|
|
+ "attempt_test_0001_r_000001_0 on tt1");
|
|
|
|
|
|
- // at this point, the scheduler tries to schedule another map from j1.
|
|
|
- // there isn't enough space. The second job's reduce should be scheduled.
|
|
|
- checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
|
|
|
-
|
|
|
- checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
|
|
|
+ checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1, 2, 200.0f,1,0);
|
|
|
+ checkOccupiedSlots("defaultXYZ", TaskType.REDUCE, 1, 1, 100.0f,0,2);
|
|
|
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
|
|
|
|
|
|
- //at this point , the scheduler tries to schedule another map from j2 for
|
|
|
- //another task tracker.
|
|
|
- // This should not happen as all the map slots are taken
|
|
|
- //by the first task itself.hence reduce task from the second job is given
|
|
|
+ //we have reached the maximum limit for map, so no more map tasks.
|
|
|
+ //we have used 1 reduce already and 1 more reduce slot is left for the
|
|
|
+ //before we reach maxcapacity for reduces.
|
|
|
+ // But current 1 slot + 2 slots for high ram reduce would
|
|
|
+ //mean we are crossing the maxium capacity.hence nothing would be assigned
|
|
|
+ //in this call
|
|
|
+ checkMultipleAssignment("tt2",null,null);
|
|
|
|
|
|
- checkAssignment("tt2","attempt_test_0002_r_000002_0 on tt2");
|
|
|
+ //complete the high ram job on tt1.
|
|
|
+ for (Task task : tasks) {
|
|
|
+ taskTrackerManager.finishTask(
|
|
|
+ "tt1", task.getTaskID().toString(),
|
|
|
+ job1);
|
|
|
+ }
|
|
|
+
|
|
|
+ //At this point we have 1 high ram map and 1 high ram reduce.
|
|
|
+ List<Task> t2 = checkMultipleAssignment(
|
|
|
+ "tt2", "attempt_test_0001_m_000002_0 on tt2",
|
|
|
+ "attempt_test_0002_r_000001_0 on tt2");
|
|
|
+
|
|
|
+ checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1, 2, 200.0f,1,0);
|
|
|
+ checkOccupiedSlots("defaultXYZ", TaskType.REDUCE, 1, 2, 200.0f,0,2);
|
|
|
+ checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L);
|
|
|
+
|
|
|
+ //complete the high ram job on tt1.
|
|
|
+ for (Task task : t2) {
|
|
|
+ taskTrackerManager.finishTask(
|
|
|
+ "tt2", task.getTaskID().toString(),
|
|
|
+ job2);
|
|
|
+ }
|
|
|
+
|
|
|
+ //1st map & 2nd reduce from job2
|
|
|
+ checkMultipleAssignment(
|
|
|
+ "tt2", "attempt_test_0002_m_000001_0 on tt2",
|
|
|
+ "attempt_test_0002_r_000002_0 on tt2");
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* test if user limits automatically adjust to max map or reduce limit
|
|
|
*/
|
|
|
- public void testUserLimitsWithMaxLimits() throws Exception {
|
|
|
- setUp(4, 4, 4);
|
|
|
+ public void testUserLimitsWithMaxCapacities() throws Exception {
|
|
|
+ setUp(2, 2, 2);
|
|
|
// set up some queues
|
|
|
String[] qs = {"default"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
- queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
|
|
|
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
|
|
|
resConf.setFakeQueues(queues);
|
|
|
- resConf.setMaxMapCap("default", 2);
|
|
|
- resConf.setMaxReduceCap("default", 2);
|
|
|
+ resConf.setMaxCapacity("default", 75.0f);
|
|
|
scheduler.setResourceManagerConf(resConf);
|
|
|
+ scheduler.setAssignMultipleTasks(true);
|
|
|
scheduler.start();
|
|
|
|
|
|
// submit a job
|
|
@@ -1480,37 +1480,27 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
FakeJobInProgress fjob2 =
|
|
|
submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
|
|
|
|
|
|
- // for queue 'default', the capacity for maps is 2.
|
|
|
- // But the max map limit is 2
|
|
|
- // hence user should be getting not more than 1 as it is the 50%.
|
|
|
- Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
+ // for queue 'default', maxCapacity for map and reduce is 3.
|
|
|
+ // initial user limit for 50% assuming there are 2 users/queue is.
|
|
|
+ // 1 map and 1 reduce.
|
|
|
+ // after max capacity it is 1.5 each.
|
|
|
|
|
|
- //Now we should get the task from the other job. As the
|
|
|
- //first user has reached his max map limit.
|
|
|
- checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
|
|
|
+ //first job would be given 1 job each.
|
|
|
+ List<Task> t1 = this.checkMultipleAssignment(
|
|
|
+ "tt1", "attempt_test_0001_m_000001_0 on tt1",
|
|
|
+ "attempt_test_0001_r_000001_0 on tt1");
|
|
|
|
|
|
- //Now we are done with map limit , now if we ask for task we should
|
|
|
- // get reduce from 1st job
|
|
|
- checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
|
|
|
- // Now we're at full capacity for maps. 1 done with reduces for job 1 so
|
|
|
- // now we should get 1 reduces for job 2
|
|
|
- Task t4 = checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4");
|
|
|
+ //for user u1 we have reached the limit. that is 1 job.
|
|
|
+ //1 more map and reduce tasks.
|
|
|
+ List<Task> t2 = this.checkMultipleAssignment(
|
|
|
+ "tt1", "attempt_test_0002_m_000001_0 on tt1",
|
|
|
+ "attempt_test_0002_r_000001_0 on tt1");
|
|
|
|
|
|
- taskTrackerManager.finishTask(
|
|
|
- "tt1", t1.getTaskID().toString(),
|
|
|
- fjob1);
|
|
|
-
|
|
|
- //tt1 completed the task so we have 1 map slot for u1
|
|
|
- // we are assigning the 2nd map task from fjob1
|
|
|
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
|
|
|
-
|
|
|
- taskTrackerManager.finishTask(
|
|
|
- "tt4", t4.getTaskID().toString(),
|
|
|
- fjob2);
|
|
|
- //tt4 completed the task , so we have 1 reduce slot for u2
|
|
|
- //we are assigning the 2nd reduce from fjob2
|
|
|
- checkAssignment("tt4", "attempt_test_0002_r_000002_0 on tt4");
|
|
|
+ t1 = this.checkMultipleAssignment(
|
|
|
+ "tt2", "attempt_test_0001_m_000002_0 on tt2",
|
|
|
+ "attempt_test_0001_r_000002_0 on tt2");
|
|
|
|
|
|
+ t1 = this.checkMultipleAssignment("tt2", null,null);
|
|
|
}
|
|
|
|
|
|
|
|
@@ -3085,20 +3075,22 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
private void checkMemReservedForTasksOnTT(String taskTracker,
|
|
|
Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
|
|
|
Long observedMemForMapsOnTT =
|
|
|
- scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker).getStatus(),
|
|
|
+ scheduler.memoryMatcher.getMemReservedForTasks(
|
|
|
+ tracker(taskTracker).getStatus(),
|
|
|
TaskType.MAP);
|
|
|
Long observedMemForReducesOnTT =
|
|
|
- scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker).getStatus(),
|
|
|
+ scheduler.memoryMatcher.getMemReservedForTasks(
|
|
|
+ tracker(taskTracker).getStatus(),
|
|
|
TaskType.REDUCE);
|
|
|
if (expectedMemForMapsOnTT == null) {
|
|
|
- assertTrue(observedMemForMapsOnTT == null);
|
|
|
+ assertEquals(observedMemForMapsOnTT, null);
|
|
|
} else {
|
|
|
- assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT));
|
|
|
+ assertEquals(observedMemForMapsOnTT, (expectedMemForMapsOnTT));
|
|
|
}
|
|
|
if (expectedMemForReducesOnTT == null) {
|
|
|
- assertTrue(observedMemForReducesOnTT == null);
|
|
|
+ assertEquals(observedMemForReducesOnTT, null);
|
|
|
} else {
|
|
|
- assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT));
|
|
|
+ assertEquals(observedMemForReducesOnTT, (expectedMemForReducesOnTT));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -3187,4 +3179,48 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
assertEquals(scheduler.getLimitMaxMemForMapSlot(),3);
|
|
|
assertEquals(scheduler.getLimitMaxMemForReduceSlot(),3);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Checks for multiple assignment.
|
|
|
+ *
|
|
|
+ * @param taskTrackerName
|
|
|
+ * @param mapAttempt
|
|
|
+ * @param reduceAttempt
|
|
|
+ * @return
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ private List<Task> checkMultipleAssignment(
|
|
|
+ String taskTrackerName, String mapAttempt, String reduceAttempt)
|
|
|
+ throws IOException {
|
|
|
+ List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
|
|
|
+ LOG.info(
|
|
|
+ " mapAttempt " + mapAttempt + " reduceAttempt " + reduceAttempt +
|
|
|
+ " assignTasks result " + tasks);
|
|
|
+
|
|
|
+ if (tasks == null || tasks.isEmpty()) {
|
|
|
+ if (mapAttempt != null || reduceAttempt != null ) {
|
|
|
+ fail(
|
|
|
+ " improper attempt " + tasks + " expected attempts are map : " +
|
|
|
+ mapAttempt + " reduce : " + reduceAttempt);
|
|
|
+ } else {
|
|
|
+ return tasks;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (tasks.size() == 1 && (mapAttempt != null && reduceAttempt != null)) {
|
|
|
+ fail(
|
|
|
+ " improper attempt " + tasks + " expected attempts are map : " +
|
|
|
+ mapAttempt + " reduce : " + reduceAttempt);
|
|
|
+ }
|
|
|
+ for (Task task : tasks) {
|
|
|
+ if (task.toString().contains("_m_")) {
|
|
|
+ assertEquals(task.toString(), mapAttempt);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (task.toString().contains("_r")) {
|
|
|
+ assertEquals(task.toString(), reduceAttempt);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return tasks;
|
|
|
+ }
|
|
|
}
|