Bläddra i källkod

Merge -r 728900:728901 from trunk to branch-0.20 to fix HADOOP-4854.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@728903 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 år sedan
förälder
incheckning
0bedee1288

+ 3 - 0
CHANGES.txt

@@ -488,6 +488,9 @@ Release 0.20.0 - Unreleased
 
     HADOOP-4924. Fixes a race condition in TaskTracker re-init. (ddas)
 
+    HADOOP-4854. Read reclaim capacity interval from capacity scheduler 
+    configuration. (Sreekanth Ramakrishnan via yhemanth)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

+ 10 - 0
conf/capacity-scheduler.xml.template

@@ -54,6 +54,16 @@
     </description>
   </property>
   
+  
+  <property>
+    <name>mapred.capacity-scheduler.reclaimCapacity.interval</name>
+    <value>5</value>
+    <description>The time interval, in seconds, between which the scheduler
+     periodically determines whether capacity needs to be reclaimed for 
+     any queue.
+    </description>
+  </property>
+  
   <!-- The default configuration settings for the capacity task scheduler -->
   <!-- The default values would be applied to all the queues which don't have -->
   <!-- the appropriate property for the particular queue -->

+ 26 - 0
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java

@@ -435,4 +435,30 @@ class CapacitySchedulerConf {
   public void setDefaultPercentOfPmemInVmem(float value) {
     rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value);
   }
+  
+  /**
+   * Gets the reclaim capacity thread interval.
+   * 
+   * @return reclaim capacity interval
+   */
+
+  public long getReclaimCapacityInterval() {
+    long reclaimCapacityInterval = 
+      rmConf.getLong("mapred.capacity-scheduler.reclaimCapacity.interval", 5);
+    
+    if(reclaimCapacityInterval <= 0) {
+      throw new IllegalArgumentException("Invalid reclaim capacity " +
+      		"interval, should be greater than zero");
+    }
+    return reclaimCapacityInterval;
+  }
+  /**
+   * Sets the reclaim capacity thread interval.
+   * 
+   * @param value
+   */
+  public void setReclaimCapacityInterval(long value) {
+    rmConf.setLong("mapred.capacity-scheduler.reclaimCapacity.interval", 
+        value);
+  }
 }

+ 3 - 4
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -1163,16 +1163,15 @@ class CapacityTaskScheduler extends TaskScheduler {
   public synchronized void start() throws IOException {
     if (started) return;
     super.start();
-    RECLAIM_CAPACITY_INTERVAL = 
-      conf.getLong("mapred.capacity-scheduler.reclaimCapacity.interval", 5);
-    RECLAIM_CAPACITY_INTERVAL *= 1000;
-
     // initialize our queues from the config settings
     if (null == rmConf) {
       rmConf = new CapacitySchedulerConf();
     }
 
     initializeMemoryRelatedConf();
+    
+    RECLAIM_CAPACITY_INTERVAL = rmConf.getReclaimCapacityInterval();
+    RECLAIM_CAPACITY_INTERVAL *= 1000;
 
     // read queue info from config file
     QueueManager queueManager = taskTrackerManager.getQueueManager();

+ 16 - 2
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -511,6 +511,8 @@ public class TestCapacityScheduler extends TestCase {
       new LinkedHashMap<String, FakeQueueInfo>();
     String firstQueue;
     
+    private long reclaimCapacityInterval = 1000;
+    
     void setFakeQueues(List<FakeQueueInfo> queues) {
       for (FakeQueueInfo q: queues) {
         queueMap.put(q.queueName, q);
@@ -554,6 +556,16 @@ public class TestCapacityScheduler extends TestCase {
     public int getMaxWorkerThreads() {
       return 1;
     }
+    
+    @Override
+    public long getReclaimCapacityInterval() {
+      return reclaimCapacityInterval ;
+    }
+    
+    @Override
+    public void setReclaimCapacityInterval(long value) {
+      this.reclaimCapacityInterval = value;
+    }
   }
 
   protected class FakeClock extends CapacityTaskScheduler.Clock {
@@ -592,8 +604,6 @@ public class TestCapacityScheduler extends TestCase {
     scheduler.setTaskTrackerManager(taskTrackerManager);
 
     conf = new JobConf();
-    // set interval to a large number so thread doesn't interfere with us
-    conf.setLong("mapred.capacity-scheduler.reclaimCapacity.interval", 500);
     scheduler.setConf(conf);
     
   }
@@ -1174,6 +1184,7 @@ public class TestCapacityScheduler extends TestCase {
     queues.add(new FakeQueueInfo("q2", 25.0f, 1000, true, 25));
     queues.add(new FakeQueueInfo("q3", 25.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
+    resConf.setReclaimCapacityInterval(500);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -1216,6 +1227,7 @@ public class TestCapacityScheduler extends TestCase {
     queues.add(new FakeQueueInfo("q3", 20.0f, 1000, true, 25));
     queues.add(new FakeQueueInfo("q4", 10.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
+    resConf.setReclaimCapacityInterval(500);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
     
@@ -1272,6 +1284,7 @@ public class TestCapacityScheduler extends TestCase {
     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
     queues.add(new FakeQueueInfo("queue", 50.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
+    resConf.setReclaimCapacityInterval(500);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
     
@@ -1306,6 +1319,7 @@ public class TestCapacityScheduler extends TestCase {
     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
+    resConf.setReclaimCapacityInterval(500);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 

+ 25 - 0
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java

@@ -372,6 +372,31 @@ public class TestCapacitySchedulerConf extends TestCase {
     } catch (IllegalArgumentException e) {}
   }
   
+  public void testInvalidReclaimCapacityInterval() throws IOException {
+    openFile();
+    startConfig();
+    Map<String, String> q1Props = setupQueueProperties(
+        new String[] { "guaranteed-capacity", 
+                       "reclaim-time-limit",
+                       "supports-priority",
+                       "minimum-user-limit-percent" }, 
+        new String[] { "-1", 
+                        "-800",
+                        "true", 
+                        "50" }
+                      );
+    writeQueueDetails("default", q1Props);
+    writeProperty("mapred.capacity-scheduler.reclaimCapacity.interval", "0");
+    endConfig();
+    try {
+      testConf = new CapacitySchedulerConf(new Path(testConfFile));
+      testConf.getReclaimCapacityInterval();
+      fail("Expect Invalid reclaim capacity interval raise Exception");
+    }catch(IllegalArgumentException e) {
+      assertTrue(true);
+    }
+  }
+  
   private void checkQueueProperties(
                         CapacitySchedulerConf testConf,
                         Map<String, Map<String, String>> queueDetails) {

+ 20 - 0
src/docs/src/documentation/content/xdocs/capacity_scheduler.xml

@@ -234,6 +234,26 @@
           </tr>
         </table>
       </section>
+      
+      <section>
+        <title>Configuring the capacity scheduler</title>
+        <p>The capacity scheduler's behavior can be controlled through the 
+          following properties. 
+        </p>
+        <table>
+          <tr>
+          <th>Name</th><th>Description</th>
+          </tr>
+          <tr>
+          <td>mapred.capacity-scheduler.reclaimCapacity.interval</td>
+          <td>The time interval, in seconds, between which the scheduler 
+          periodically determines whether capacity needs to be reclaimed for 
+          any queue. The default value is 5 seconds.
+          </td>
+          </tr>
+        </table>
+        
+      </section>
 
       <section>
         <title>Reviewing the configuration of the capacity scheduler</title>