Browse Source

HADOOP-5932. Fixes a problem in capacity scheduler in computing available memory on a tasktracker. Contributed by Vinod Kumar Vavilapalli.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@779893 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 years ago
parent
commit
f69287df51

+ 4 - 0
CHANGES.txt

@@ -812,6 +812,10 @@ Release 0.20.1 - Unreleased
     output compression for merged data.
     (Jothi Padmanabhan and Billy Pearson via ddas)
 
+    HADOOP-5932. Fixes a problem in capacity scheduler in computing
+    available memory on a tasktracker.
+    (Vinod Kumar Vavilapalli via yhemanth)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

+ 2 - 2
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java

@@ -53,10 +53,9 @@ class MemoryMatcher {
    * @return amount of memory that is used by the residing tasks,
    *          null if memory cannot be computed for some reason.
    */
-  private synchronized Long getMemReservedForTasks(
+  synchronized Long getMemReservedForTasks(
       TaskTrackerStatus taskTracker, CapacityTaskScheduler.TYPE taskType) {
     long vmem = 0;
-    long myVmem = 0;
 
     for (TaskStatus task : taskTracker.getTaskReports()) {
       // the following task states are one in which the slot is
@@ -90,6 +89,7 @@ class MemoryMatcher {
         // tasks' memory limits to the nearest multiple of the slot-memory-size
         // set on JT. This essentially translates to tasks of a high memory job
         // using multiple slots.
+        long myVmem = 0;
         if (task.getIsMap() && taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
           myVmem = jConf.getMemoryForMapTask();
           myVmem =

+ 57 - 1
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -1589,9 +1589,12 @@ public class TestCapacityScheduler extends TestCase {
     
     // first, a map from j1 will run
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
     // at this point, the scheduler tries to schedule another map from j1. 
     // there isn't enough space. The second job's reduce should be scheduled.
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
   }
 
   /**
@@ -1637,7 +1640,9 @@ public class TestCapacityScheduler extends TestCase {
 
     // Fill the second tt with this job.
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
 
     LOG.debug("Submit one high memory(2GB maps/reduces) job of "
         + "2 map, 2 reduce tasks.");
@@ -1651,7 +1656,9 @@ public class TestCapacityScheduler extends TestCase {
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
 
     LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
         + "1 map, 0 reduce tasks.");
@@ -1669,6 +1676,8 @@ public class TestCapacityScheduler extends TestCase {
     assertNull(scheduler.assignTasks(tracker("tt2")));
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
   }
 
   /**
@@ -1717,6 +1726,7 @@ public class TestCapacityScheduler extends TestCase {
 
     // 1st cycle - 1 map gets assigned.
     Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1",  512L, 0L);
     
     // kill this job !
     taskTrackerManager.killJob(job1.getJobID());
@@ -1738,15 +1748,21 @@ public class TestCapacityScheduler extends TestCase {
     // 2nd cycle - nothing should get assigned. Memory matching code
     // will see the job is missing and fail memory requirements.
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    checkMemReservedForTasksOnTT("tt1", null, null);
+
     // calling again should not make a difference, as the task is still running
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    checkMemReservedForTasksOnTT("tt1", null, null);
     
     // finish the task on the tracker.
     taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
     // now a new task can be assigned.
     t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 512L, 0L);
+
     // reduce can be assigned.
     t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 512L, 512L);
   }
   
   protected TaskTrackerStatus tracker(String taskTrackerName) {
@@ -1761,7 +1777,35 @@ public class TestCapacityScheduler extends TestCase {
     assertEquals(expectedTaskString, tasks.get(0).toString());
     return tasks.get(0);
   }
-  
+
+  /**
+   * Get the amount of memory that is reserved for tasks on the taskTracker and
+   * verify that it matches what is expected.
+   * 
+   * @param taskTracker
+   * @param expectedMemForMapsOnTT
+   * @param expectedMemForReducesOnTT
+   */
+  private void checkMemReservedForTasksOnTT(String taskTracker,
+      Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
+    Long observedMemForMapsOnTT =
+        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
+            CapacityTaskScheduler.TYPE.MAP);
+    Long observedMemForReducesOnTT =
+        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
+            CapacityTaskScheduler.TYPE.REDUCE);
+    if (expectedMemForMapsOnTT == null) {
+      assertTrue(observedMemForMapsOnTT == null);
+    } else {
+      assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT));
+    }
+    if (expectedMemForReducesOnTT == null) {
+      assertTrue(observedMemForReducesOnTT == null);
+    } else {
+      assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT));
+    }
+  }
+
   /*
    * Test cases for Job Initialization poller.
    */
@@ -2170,17 +2214,23 @@ public class TestCapacityScheduler extends TestCase {
     //scheduled. This task would be scheduled. Till the tasks from job1 gets
     //complete none of the tasks from other jobs would be scheduled.
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
     assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0);
     //make same tracker get back, check if you are blocking. Your job
     //has speculative map task so tracker should be blocked even tho' it
     //can run job2's map.
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
     //TT2 now gets speculative map of the job1
     checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
+    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
 
     // Now since the first job has no more speculative maps, it can schedule
     // the second job.
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 3 * 1024L, 0L);
 
     //finish everything
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", 
@@ -2223,7 +2273,9 @@ public class TestCapacityScheduler extends TestCase {
 
     // Finish up the map scheduler
     checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
     checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
+    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
 
     // first, a reduce from j3 will run
     // at this point, there is a speculative task for the same job to be
@@ -2231,16 +2283,20 @@ public class TestCapacityScheduler extends TestCase {
     //complete none of the tasks from other jobs would be scheduled.
     checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
     assertEquals("pending reduces greater than zero " , job3.pendingMaps(), 0);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2*1024L);
+
     //make same tracker get back, check if you are blocking. Your job
     //has speculative reduce task so tracker should be blocked even tho' it
     //can run job4's reduce.
     assertNull(scheduler.assignTasks(tracker("tt1")));
     //TT2 now gets speculative map of the job1
     checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2");
+    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L);
 
     // Now since j3 has no more speculative reduces, it can schedule
     // the j4.
     checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 3 * 1024L);
   }
 
   private void checkRunningJobMovementAndCompletion() throws IOException {