|
@@ -487,7 +487,18 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
maxReduceTasksPerTracker));
|
|
|
trackers.put(ttName, tt);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ public void addTaskTracker(String ttName,
|
|
|
+ int maxMapTasksPerTracker,
|
|
|
+ int maxReduceTasksPerTracker) {
|
|
|
+ TaskTracker tt = new TaskTracker(ttName);
|
|
|
+ tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", 1,
|
|
|
+ new ArrayList<TaskStatus>(), 0, 0,
|
|
|
+ maxMapTasksPerTracker,
|
|
|
+ maxReduceTasksPerTracker));
|
|
|
+ trackers.put(ttName, tt);
|
|
|
+ }
|
|
|
+
|
|
|
public ClusterStatus getClusterStatus() {
|
|
|
int numTrackers = trackers.size();
|
|
|
return new ClusterStatus(numTrackers, maps, reduces,
|
|
@@ -3039,7 +3050,82 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
// No more tasks there in job3 also
|
|
|
assertEquals(0, scheduler.assignTasks(tracker("tt3")).size());
|
|
|
-}
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test to verify that TTs are not reserved in case the required memory
|
|
|
+ * exceeds the total availability of memory on TT.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public void testTTReservingInHeterogenousEnvironment()
|
|
|
+ throws IOException {
|
|
|
+ // 2 taskTrackers, 4 map slots on one and 3 map slot on another.
|
|
|
+ taskTrackerManager = new FakeTaskTrackerManager(1, 4, 0);
|
|
|
+ taskTrackerManager.addTaskTracker("tt2", 3, 0);
|
|
|
+
|
|
|
+ taskTrackerManager.addQueues(new String[] { "default" });
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ scheduler.setTaskTrackerManager(taskTrackerManager);
|
|
|
+ // enabled memory-based scheduling
|
|
|
+ // Normal job in the cluster would be 2GB maps/reduces
|
|
|
+ // Max allowed map memory would be 8GB.
|
|
|
+ scheduler.getConf().setLong(
|
|
|
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 8 * 1024);
|
|
|
+ scheduler.getConf().setLong(
|
|
|
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
|
|
|
+ scheduler.getConf().setLong(
|
|
|
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 8 * 1024);
|
|
|
+ scheduler.getConf().setLong(
|
|
|
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 2 * 1024);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ scheduler.start();
|
|
|
+
|
|
|
+ LOG.debug("Submit a memory(7GB vmem maps/reduces) job of "
|
|
|
+ + "2 map & 0 red tasks");
|
|
|
+ JobConf jConf = new JobConf(conf);
|
|
|
+
|
|
|
+ jConf = new JobConf(conf);
|
|
|
+ // We require 7GB maps, so thats worth 4 slots on the cluster.
|
|
|
+ jConf.setMemoryForMapTask(7 * 1024);
|
|
|
+ jConf.setMemoryForReduceTask(1 * 1024);
|
|
|
+ // Hence, 4 + 4 slots are required totally, for two tasks.
|
|
|
+ jConf.setNumMapTasks(2);
|
|
|
+ jConf.setNumReduceTasks(0);
|
|
|
+ jConf.setQueueName("default");
|
|
|
+ jConf.setUser("u1");
|
|
|
+ FakeJobInProgress job = submitJobAndInit(JobStatus.PREP, jConf);
|
|
|
+ // Heartbeating the trackers
|
|
|
+ scheduler.assignTasks(tracker("tt1"));
|
|
|
+ scheduler.assignTasks(tracker("tt2"));
|
|
|
+ scheduler.updateQueueUsageForTests();
|
|
|
+ LOG.info(job.getSchedulingInfo());
|
|
|
+ // tt2 can at most run 3 slots while each map task of this job requires
|
|
|
+ // at least 4 minimum slots to run.
|
|
|
+ // tt2 should not at all be reserved, hence. Since it would be a waste of
|
|
|
+ // slots for other jobs.
|
|
|
+ assertEquals("Tracker tt2 got reserved unnecessarily.",
|
|
|
+ 0, scheduler.getMapScheduler().getNumReservedTaskTrackers(job));
|
|
|
+ assertEquals(
|
|
|
+ // Should be running only one map task worth four slots,
|
|
|
+ // and no reservations.
|
|
|
+ CapacityTaskScheduler.getJobQueueSchedInfo(1, 4, 0, 0, 0, 0),
|
|
|
+ (String) job.getSchedulingInfo());
|
|
|
+ jConf = new JobConf(conf);
|
|
|
+ // Try submitting a 3-slot worthy job, targeting tt2
|
|
|
+ // 5 GB should be worth 3 slots (2GB/map)
|
|
|
+ jConf.setMemoryForMapTask(5 * 1024);
|
|
|
+ jConf.setMemoryForReduceTask(1 * 1024);
|
|
|
+ // Just one task, targetting an unreserved tt2
|
|
|
+ jConf.setNumMapTasks(1);
|
|
|
+ jConf.setNumReduceTasks(0);
|
|
|
+ jConf.setQueueName("default");
|
|
|
+ jConf.setUser("u1");
|
|
|
+ submitJobAndInit(JobStatus.PREP, jConf);
|
|
|
+ // TT2 should get assigned.
|
|
|
+ checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Test to verify that queue ordering is based on the number of slots occupied
|