Prechádzať zdrojové kódy

HADOOP-4774. Fix default values of some capacity scheduler configuration items. Contributed by Sreekanth Ramakrishnan

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@727660 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 rokov pred
rodič
commit
5fb9f17279

+ 4 - 0
CHANGES.txt

@@ -471,6 +471,10 @@ Release 0.20.0 - Unreleased
     lib directory and added the same to test-patch.sh. (Giridharan Kesavan via
     lib directory and added the same to test-patch.sh. (Giridharan Kesavan via
     acmurthy)
     acmurthy)
 
 
+    HADOOP-4774. Fix default values of some capacity scheduler configuration
+    items which would otherwise not work on a fresh checkout.
+    (Sreekanth Ramakrishnan via yhemanth)
+
 Release 0.19.1 - Unreleased
 Release 0.19.1 - Unreleased
 
 
   IMPROVEMENTS
   IMPROVEMENTS

+ 23 - 4
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java

@@ -71,6 +71,18 @@ class CapacitySchedulerConf {
   static final String UPPER_LIMIT_ON_TASK_PMEM_PROPERTY =
   static final String UPPER_LIMIT_ON_TASK_PMEM_PROPERTY =
       "mapred.capacity-scheduler.task.limit.maxpmem";
       "mapred.capacity-scheduler.task.limit.maxpmem";
 
 
+  /**
+   * The constant which defines the default initialization thread
+   * polling interval, denoted in milliseconds.
+   */
+  private static final int INITIALIZATION_THREAD_POLLING_INTERVAL = 5000;
+
+  /**
+   * The constant which defines the maximum number of worker threads to be
+   * spawned off for job initialization
+   */
+  private static final int MAX_INITIALIZATION_WORKER_THREADS = 5;
+
   private Configuration rmConf;
   private Configuration rmConf;
 
 
   private int defaultMaxJobsPerUsersToInitialize;
   private int defaultMaxJobsPerUsersToInitialize;
@@ -312,15 +324,19 @@ class CapacitySchedulerConf {
   }
   }
 
 
   /**
   /**
-   * Amount of time in miliseconds which poller thread and initialization
+   * Amount of time in milliseconds which poller thread and initialization
    * thread would sleep before looking at the queued jobs.
    * thread would sleep before looking at the queued jobs.
+   * 
+   * The default value if no corresponding configuration is present is
+   * 5000 Milliseconds.
    *  
    *  
-   * @return time in miliseconds.
+   * @return time in milliseconds.
    * @throws IllegalArgumentException if time is negative or zero.
    * @throws IllegalArgumentException if time is negative or zero.
    */
    */
   public long getSleepInterval() {
   public long getSleepInterval() {
     long sleepInterval = rmConf.getLong(
     long sleepInterval = rmConf.getLong(
-        "mapred.capacity-scheduler.init-poll-interval", -1);
+        "mapred.capacity-scheduler.init-poll-interval", 
+        INITIALIZATION_THREAD_POLLING_INTERVAL);
     
     
     if(sleepInterval <= 0) {
     if(sleepInterval <= 0) {
       throw new IllegalArgumentException(
       throw new IllegalArgumentException(
@@ -342,12 +358,15 @@ class CapacitySchedulerConf {
    * So a given thread can have responsibility of initializing jobs from more 
    * So a given thread can have responsibility of initializing jobs from more 
    * than one queue.
    * than one queue.
    * 
    * 
+   * The default value is 5
+   * 
    * @return maximum number of threads spawned to initialize jobs in job queue
    * @return maximum number of threads spawned to initialize jobs in job queue
    * in parallel.
    * in parallel.
    */
    */
   public int getMaxWorkerThreads() {
   public int getMaxWorkerThreads() {
     int maxWorkerThreads = rmConf.getInt(
     int maxWorkerThreads = rmConf.getInt(
-        "mapred.capacity-scheduler.init-worker-threads", 0);
+        "mapred.capacity-scheduler.init-worker-threads", 
+        MAX_INITIALIZATION_WORKER_THREADS);
     if(maxWorkerThreads <= 0) {
     if(maxWorkerThreads <= 0) {
       throw new IllegalArgumentException(
       throw new IllegalArgumentException(
           "Invalid initializater worker thread number " + maxWorkerThreads);
           "Invalid initializater worker thread number " + maxWorkerThreads);

+ 49 - 9
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java

@@ -323,6 +323,55 @@ public class TestCapacitySchedulerConf extends TestCase {
     }
     }
   }
   }
   
   
+  public void testInitializationPollerProperties() 
+    throws Exception {
+    /*
+     * Test case to check properties of poller when no configuration file
+     * is present.
+     */
+    testConf = new CapacitySchedulerConf();
+    long pollingInterval = testConf.getSleepInterval();
+    int maxWorker = testConf.getMaxWorkerThreads();
+    assertTrue("Invalid polling interval ",pollingInterval > 0);
+    assertTrue("Invalid working thread pool size" , maxWorker > 0);
+    
+    //test case for custom values configured for initialization 
+    //poller.
+    openFile();
+    startConfig();
+    writeProperty("mapred.capacity-scheduler.init-worker-threads", "1");
+    writeProperty("mapred.capacity-scheduler.init-poll-interval", "1");
+    endConfig();
+    
+    testConf = new CapacitySchedulerConf(new Path(testConfFile));
+    
+    pollingInterval = testConf.getSleepInterval();
+    
+    maxWorker = testConf.getMaxWorkerThreads();
+    
+    assertEquals("Invalid polling interval ",pollingInterval ,1);
+    assertEquals("Invalid working thread pool size" , maxWorker, 1);
+    
+    //Test case for invalid values configured for initialization
+    //poller
+    openFile();
+    startConfig();
+    writeProperty("mapred.capacity-scheduler.init-worker-threads", "0");
+    writeProperty("mapred.capacity-scheduler.init-poll-interval", "0");
+    endConfig();
+    
+    testConf = new CapacitySchedulerConf(new Path(testConfFile));
+    
+    try {
+      pollingInterval = testConf.getSleepInterval();
+      fail("Polling interval configured is illegal");
+    } catch (IllegalArgumentException e) {}
+    try {
+      maxWorker = testConf.getMaxWorkerThreads();
+      fail("Max worker thread configured is illegal");
+    } catch (IllegalArgumentException e) {}
+  }
+  
   private void checkQueueProperties(
   private void checkQueueProperties(
                         CapacitySchedulerConf testConf,
                         CapacitySchedulerConf testConf,
                         Map<String, Map<String, String>> queueDetails) {
                         Map<String, Map<String, String>> queueDetails) {
@@ -396,15 +445,6 @@ public class TestCapacitySchedulerConf extends TestCase {
   }
   }
   
   
   
   
-  private void writeUserDefinedDefaultConfigurationWithoutGC() {
-    writeProperty("mapred.capacity-scheduler.default-reclaim-time-limit"
-        , "800");
-    writeProperty("mapred.capacity-scheduler.default-supports-priority"
-        , "true");
-    writeProperty("mapred.capacity-scheduler.default-minimum-user-limit-percent"
-        , "50");    
-  }
-  
   private void writeProperty(String name, String value) {
   private void writeProperty(String name, String value) {
     writer.println("<property>");
     writer.println("<property>");
     writer.println("<name> " + name + "</name>");
     writer.println("<name> " + name + "</name>");