Browse Source

commit 5f8761f9b4042f0fb1e47846dd93713d8903cd67
Author: Lee Tucker <ltucker@yahoo-inc.com>
Date: Thu Jul 30 17:40:44 2009 -0700

Applying patch 2871273.mr722.patch


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1076956 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
1312d8dec8

+ 13 - 12
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -529,21 +529,22 @@ class CapacityTaskScheduler extends TaskScheduler {
             continue;
             continue;
           }
           }
         } else {
         } else {
-          //if memory requirements don't match then we check if the 
-          //job has either pending or speculative task or has insufficient number
-          //of 'reserved' tasktrackers to cover all pending tasks. If so
-          //we reserve the current tasktracker for this job so that 
-          //high memory jobs are not starved
-          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTrackerStatus) || 
-              !hasSufficientReservedTaskTrackers(j)) {
+          // if memory requirements don't match then we check if the job has
+          // pending tasks and has insufficient number of 'reserved'
+          // tasktrackers to cover all pending tasks. If so we reserve the
+          // current tasktracker for this job so that high memory jobs are not
+          // starved
+          if ((getPendingTasks(j) != 0 && !hasSufficientReservedTaskTrackers(j))) {
             // Reserve all available slots on this tasktracker
             // Reserve all available slots on this tasktracker
-            LOG.info(j.getJobID() + ": Reserving " + taskTracker.getTrackerName() + 
-                     " since memory-requirements don't match");
-            taskTracker.reserveSlots(type, j, taskTracker.getAvailableSlots(type));
-            
+            LOG.info(j.getJobID() + ": Reserving "
+                + taskTracker.getTrackerName()
+                + " since memory-requirements don't match");
+            taskTracker.reserveSlots(type, j, taskTracker
+                .getAvailableSlots(type));
+
             // Block
             // Block
             return TaskLookupResult.getMemFailedResult();
             return TaskLookupResult.getMemFailedResult();
-          } 
+          }
         }//end of memory check block
         }//end of memory check block
         // if we're here, this job has no task to run. Look at the next job.
         // if we're here, this job has no task to run. Look at the next job.
       }//end of for loop
       }//end of for loop

+ 92 - 88
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -2048,30 +2048,29 @@ public class TestCapacityScheduler extends TestCase {
     jConf.setUser("u1");
     jConf.setUser("u1");
     FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
     FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
 
 
-    // Job2 cannot fit on tt2 or tt1. Blocking. Job3 also will not run.
+    // Job2 cannot fit on tt1. So tt1 is reserved for a map slot of job2
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt1")));
-    assertNull(scheduler.assignTasks(tracker("tt2")));
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt1")));
-    assertNull(scheduler.assignTasks(tracker("tt2")));
-    // reserved tasktrackers contribute to occupied slots
-    // for maps, both tasktrackers are reserved.
-    checkOccupiedSlots("default", TaskType.MAP, 1, 7, 175.0f);
-    // for reduces, only one tasktracker is reserved, because
-    // the reduce scheduler is not visited for tt1 (as it has
-    // 0 slots free).
-    checkOccupiedSlots("default", TaskType.REDUCE, 1, 5,
-        125.0f);
+
+    // reserved tasktrackers contribute to occupied slots for maps.
+    checkOccupiedSlots("default", TaskType.MAP, 1, 5, 125.0f);
+    // occupied slots for reduces remain unchanged as tt1 is not reserved for
+    // reduces.
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 3, 75.0f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
     checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
     checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
     LOG.info(job2.getSchedulingInfo());
     LOG.info(job2.getSchedulingInfo());
     assertEquals(String.format(
     assertEquals(String.format(
         CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
         CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
-        1, 2, 4, 1, 2, 2),
+        1, 2, 2, 1, 2, 0),
         (String) job2.getSchedulingInfo());
         (String) job2.getSchedulingInfo());
     assertEquals(String.format(
     assertEquals(String.format(
         CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
         CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
         0, 0, 0, 0, 0, 0),
         0, 0, 0, 0, 0, 0),
         (String) job3.getSchedulingInfo());
         (String) job3.getSchedulingInfo());
+
+    // One reservation is already done for job2. So job3 should go ahead.
+    checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
   }
   }
 
 
   /**
   /**
@@ -2574,18 +2573,14 @@ public class TestCapacityScheduler extends TestCase {
   }
   }
 
 
   /**
   /**
-   * Test case to test scheduling of jobs with speculative execution
-   * in the face of high RAM jobs.
-   * 
-   * Essentially, the test verifies that if a high RAM job has speculative
-   * tasks that cannot run because of memory requirements, we block
-   * that node and do not return any tasks to it.
-   * 
+   * Test to verify that TTs are reserved for high memory jobs, but only till a
+   * TT is reserved for each of the pending task.
    * @throws IOException
    * @throws IOException
    */
    */
-  public void testHighRamJobWithSpeculativeExecution() throws IOException {
-    // 2 TTs, 3 map and 3 reduce slots on each TT
-    taskTrackerManager = new FakeTaskTrackerManager(2, 3, 3);
+  public void testTTReservingWithHighMemoryJobs()
+      throws IOException {
+    // 3 taskTrackers, 2 map and 0 reduce slots on each TT
+    taskTrackerManager = new FakeTaskTrackerManager(3, 2, 0);
 
 
     taskTrackerManager.addQueues(new String[] { "default" });
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -2593,96 +2588,105 @@ public class TestCapacityScheduler extends TestCase {
     resConf.setFakeQueues(queues);
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
     // enabled memory-based scheduling
-    // 1GB for each map, 1GB for each reduce
+    // Normal job in the cluster would be 1GB maps/reduces
     scheduler.getConf().setLong(
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
-        3 * 1024L);
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
     scheduler.getConf().setLong(
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L);
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.getConf().setLong(
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
-        3 * 1024L);
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.getConf().setLong(
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024L);
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.setResourceManagerConf(resConf);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
     scheduler.start();
 
 
-    // Submit a normal job that should occupy a node
+    LOG.debug("Submit a regular memory(1GB vmem maps/reduces) job of "
+        + "3 map/red tasks");
     JobConf jConf = new JobConf(conf);
     JobConf jConf = new JobConf(conf);
+    jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
     jConf.setMemoryForMapTask(1 * 1024);
-    jConf.setMemoryForReduceTask(0);
-    jConf.setNumMapTasks(2);
-    jConf.setNumReduceTasks(0);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    jConf.setNumMapTasks(3);
+    jConf.setNumReduceTasks(3);
     jConf.setQueueName("default");
     jConf.setQueueName("default");
     jConf.setUser("u1");
     jConf.setUser("u1");
-    FakeJobInProgress job1 = submitJob(JobStatus.PREP, jConf);
-    
-    //Submit a high memory job with speculative tasks.
-    jConf = new JobConf();
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    // assign one map task of job1 on all the TTs
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt3", "attempt_test_0001_m_000003_0 on tt3");
+    scheduler.updateQSIInfoForTests();
+
+    LOG.info(job1.getSchedulingInfo());
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 3, 3, 0, 0,
+        0, 0), (String) job1.getSchedulingInfo());
+
+    LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
+        + "2 map tasks");
     jConf.setMemoryForMapTask(2 * 1024);
     jConf.setMemoryForMapTask(2 * 1024);
     jConf.setMemoryForReduceTask(0);
     jConf.setMemoryForReduceTask(0);
-    jConf.setNumMapTasks(1);
+    jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(0);
     jConf.setNumReduceTasks(0);
     jConf.setQueueName("default");
     jConf.setQueueName("default");
     jConf.setUser("u1");
     jConf.setUser("u1");
-    jConf.setMapSpeculativeExecution(true);
-    jConf.setReduceSpeculativeExecution(false);
-    FakeJobInProgress job2 =
-        new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
-            taskTrackerManager, "u1");
-    taskTrackerManager.submitJob(job2);
-
-    //Submit normal job
+    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
+        + "2 map/red tasks");
     jConf = new JobConf(conf);
     jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
     jConf.setMemoryForMapTask(1 * 1024);
-    jConf.setMemoryForReduceTask(0);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(0);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(2);
     jConf.setQueueName("default");
     jConf.setQueueName("default");
     jConf.setUser("u1");
     jConf.setUser("u1");
-    jConf.setMapSpeculativeExecution(false);
-    jConf.setReduceSpeculativeExecution(false);
-    FakeJobInProgress job3 = submitJob(JobStatus.PREP, jConf);
-
-    controlledInitializationPoller.selectJobsToInitialize();
-    raiseStatusChangeEvents(scheduler.jobQueuesManager);
+    FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
 
 
-    // Have one node on which all tasks of job1 are scheduled.
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    // Job2, a high memory job cannot be accommodated on a any TT. But with each
+    // trip to the scheduler, each of the TT should be reserved by job2.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    scheduler.updateQSIInfoForTests();
+    LOG.info(job2.getSchedulingInfo());
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 2, 0,
+        0, 0), (String) job2.getSchedulingInfo());
 
 
-    // raise events to initialize the 3rd job
-    controlledInitializationPoller.selectJobsToInitialize();
-    raiseStatusChangeEvents(scheduler.jobQueuesManager);
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+    scheduler.updateQSIInfoForTests();
+    LOG.info(job2.getSchedulingInfo());
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 4, 0,
+        0, 0), (String) job2.getSchedulingInfo());
 
 
-    // On the second node, one task of the high RAM job can be scheduled.
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
-    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
-    assertEquals("pending maps greater than zero " , job2.pendingMaps(), 0);
-    // Total 4 map slots should be accounted for.
-    checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
-    
-    // now when the first node gets back, it cannot run any task
-    // because job2 has a speculative task that can run on this node.
-    // This is even though job3's tasks can run on this node.
+    // Job2 has only 2 pending tasks. So no more reservations. Job3 should get
+    // slots on tt3. tt1 and tt2 should not be assigned any slots with the
+    // reservation stats intact.
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt1")));
-    // Reservation will count for 2 more slots.
-    checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
-    
-    // finish one task from tt1.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", 
-        job1);
-    
-    // now, we can schedule the speculative task on tt1
-    checkAssignment("tt1", "attempt_test_0002_m_000001_1 on tt1");
-    
-    // finish one more task from tt1.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", 
-        job1);
-    
-    // now the new job's tasks can be scheduled.
-    checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
-  }
+    scheduler.updateQSIInfoForTests();
+    LOG.info(job2.getSchedulingInfo());
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 4, 0,
+        0, 0), (String) job2.getSchedulingInfo());
+
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+    scheduler.updateQSIInfoForTests();
+    LOG.info(job2.getSchedulingInfo());
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 4, 0,
+        0, 0), (String) job2.getSchedulingInfo());
+
+    checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
+    scheduler.updateQSIInfoForTests();
+    LOG.info(job2.getSchedulingInfo());
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 4, 0,
+        0, 0), (String) job2.getSchedulingInfo());
+
+    // No more tasks there in job3 also
+    assertNull(scheduler.assignTasks(tracker("tt3")));
+}
 
 
   /**
   /**
    * Test to verify that queue ordering is based on the number of slots occupied
    * Test to verify that queue ordering is based on the number of slots occupied