فهرست منبع

HADOOP-4274. Capacity scheduler accidently modifies the underlying
data structures when browing the job lists. (Hemanth Yamijala via omalley)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@700255 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 16 سال پیش
والد
کامیت
6e3888df75

+ 3 - 0
CHANGES.txt

@@ -784,6 +784,9 @@ Release 0.19.0 - Unreleased
     HADOOP-4237. Fixes the TestStreamingBadRecords.testNarrowDown testcase.
     HADOOP-4237. Fixes the TestStreamingBadRecords.testNarrowDown testcase.
     (Sharad Agarwal via ddas)
     (Sharad Agarwal via ddas)
 
 
+    HADOOP-4274. Capacity scheduler accidently modifies the underlying 
+    data structures when browing the job lists. (Hemanth Yamijala via omalley)
+
 Release 0.18.2 - Unreleased
 Release 0.18.2 - Unreleased
 
 
   BUG FIXES
   BUG FIXES

+ 5 - 4
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -1124,10 +1124,11 @@ class CapacityTaskScheduler extends TaskScheduler {
   
   
   @Override
   @Override
   public synchronized Collection<JobInProgress> getJobs(String queueName) {
   public synchronized Collection<JobInProgress> getJobs(String queueName) {
-    Collection<JobInProgress> jobCollection = 
-      jobQueuesManager.getRunningJobQueue(queueName);
-    if(jobCollection == null) {
-      jobCollection = new ArrayList<JobInProgress>();
+    Collection<JobInProgress> jobCollection = new ArrayList<JobInProgress>();
+    Collection<JobInProgress> runningJobs = 
+        jobQueuesManager.getRunningJobQueue(queueName);
+    if (runningJobs != null) {
+      jobCollection.addAll(runningJobs);
     }
     }
     Collection<JobInProgress> waitingJobs = 
     Collection<JobInProgress> waitingJobs = 
       jobQueuesManager.getWaitingJobQueue(queueName);
       jobQueuesManager.getWaitingJobQueue(queueName);

+ 31 - 0
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
@@ -446,6 +447,36 @@ public class TestCapacityScheduler extends TestCase {
     t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
   }
   }
   
   
+  public void testGetJobs() throws Exception {
+    // need only one queue
+    String[] qs = { "default" };
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, 300, true, 100));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+    
+    // submit a job
+    JobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, "default", "u1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    
+    // submit another job
+    JobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "default", "u1");
+    
+    Collection<JobInProgress> jobs = scheduler.getJobs("default");
+    assertEquals(2, jobs.size());
+    Iterator<JobInProgress> iter = jobs.iterator();
+    assertEquals(j1, iter.next());
+    assertEquals(j2, iter.next());
+    
+    assertEquals(1, scheduler.jobQueuesManager.
+                        getRunningJobQueue("default").size());
+    assertEquals(1, scheduler.jobQueuesManager.
+                        getWaitingJobQueue("default").size());
+  }
+  
   // test capacity transfer
   // test capacity transfer
   public void testCapacityTransfer() throws Exception {
   public void testCapacityTransfer() throws Exception {
     // set up some queues
     // set up some queues