Преглед изворни кода

commit 29ef44b7d21d938d25db3f7db987bfbf66c3f769
Author: Hemanth Yamijala <yhemanth@apache.org>
Date: Thu Oct 15 11:58:38 2009 +0530

MAPREDUCE:979 from https://issues.apache.org/jira/secure/attachment/12421852/mapreduce-979-20-3.patch

+++ b/YAHOO-CHANGES.txt
+66. MAPREDUCE-979. Fixed JobConf APIs related to memory parameters to
+ return values of new configuration variables when deprecated
+ variables are disabled. (Sreekanth Ramakrishnan via yhemanth)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077018 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley пре 14 година
родитељ
комит
88c5adc18f

+ 75 - 43
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -112,7 +112,8 @@ public class JobConf extends Configuration {
   }
 
   /**
-   * @deprecated
+   * @deprecated Use {@link #MAPRED_JOB_MAP_MEMORY_MB_PROPERTY} and
+   * {@link #MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY}
    */
   @Deprecated
   public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
@@ -1574,32 +1575,68 @@ public class JobConf extends Configuration {
     return get("job.local.dir");
   }
 
+  /**
+   * Get memory required to run a map task of the job, in MB.
+   * 
+   * If a value is specified in the configuration, it is returned.
+   * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
+   * <p/>
+   * For backward compatibility, if the job configuration sets the
+   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
+   * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
+   * after converting it from bytes to MB.
+   * @return memory required to run a map task of the job, in MB,
+   *          or {@link #DISABLED_MEMORY_LIMIT} if unset.
+   */
   public long getMemoryForMapTask() {
-    if (get(MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
-      long val = getLong(
-        MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
-      return (val == DISABLED_MEMORY_LIMIT) ? val :
-        ((val < 0) ? DISABLED_MEMORY_LIMIT : val / (1024 * 1024));
+    long value = getDeprecatedMemoryValue();
+    if (value == DISABLED_MEMORY_LIMIT) {
+      value = normalizeMemoryConfigValue(
+                getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
+                          DISABLED_MEMORY_LIMIT));
     }
-    return getLong(
-      JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
-      DISABLED_MEMORY_LIMIT);
+    return value;
   }
 
   public void setMemoryForMapTask(long mem) {
     setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
   }
 
+  /**
+   * Get memory required to run a reduce task of the job, in MB.
+   * 
+   * If a value is specified in the configuration, it is returned.
+   * Else, it returns {@link #DISABLED_MEMORY_LIMIT}.
+   * <p/>
+   * For backward compatibility, if the job configuration sets the
+   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
+   * from {@link #DISABLED_MEMORY_LIMIT}, that value will be used
+   * after converting it from bytes to MB.
+   * @return memory required to run a reduce task of the job, in MB,
+   *          or {@link #DISABLED_MEMORY_LIMIT} if unset.
+   */
   public long getMemoryForReduceTask() {
-    if (get(MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
-      long val = getLong(
-        MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
-      return (val == DISABLED_MEMORY_LIMIT) ? val :
-        ((val < 0) ? DISABLED_MEMORY_LIMIT : val / (1024 * 1024));
+    long value = getDeprecatedMemoryValue();
+    if (value == DISABLED_MEMORY_LIMIT) {
+      value = normalizeMemoryConfigValue(
+                getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
+                        DISABLED_MEMORY_LIMIT));
     }
-    return getLong(
-      JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
-      DISABLED_MEMORY_LIMIT);
+    return value;
+  }
+  
+  // Return the value set to the key MAPRED_TASK_MAXVMEM_PROPERTY,
+  // converted into MBs.
+  // Returns DISABLED_MEMORY_LIMIT if unset, or set to a negative
+  // value.
+  private long getDeprecatedMemoryValue() {
+    long oldValue = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, 
+        DISABLED_MEMORY_LIMIT);
+    oldValue = normalizeMemoryConfigValue(oldValue);
+    if (oldValue != DISABLED_MEMORY_LIMIT) {
+      oldValue /= (1024*1024);
+    }
+    return oldValue;
   }
 
   public void setMemoryForReduceTask(long mem) {
@@ -1704,18 +1741,21 @@ public class JobConf extends Configuration {
 
 
   /**
-   * The maximum amount of memory any task of this job will use. See
+   * Get the memory required to run a task of this job, in bytes. See
    * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
    * <p/>
-   * mapred.task.maxvmem is split into
-   * mapred.job.map.memory.mb
-   * and mapred.job.map.memory.mb,mapred
-   * each of the new key are set
-   * as mapred.task.maxvmem / 1024
-   * as new values are in MB
-   *
-   * @return The maximum amount of memory any task of this job will use, in
-   *         bytes.
+   * This method is deprecated. Now, different memory limits can be
+   * set for map and reduce tasks of a job, in MB. 
+   * <p/>
+   * For backward compatibility, if the job configuration sets the
+   * key {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to a value different
+   * from {@link #DISABLED_MEMORY_LIMIT}, that value is returned. 
+   * Otherwise, this method will return the larger of the values returned by 
+   * {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()}
+   * after converting them into bytes.
+   * 
+   * @return Memory required to run a task of this job, in bytes,
+   *         or {@link #DISABLED_MEMORY_LIMIT}, if unset.
    * @see #setMaxVirtualMemoryForTask(long)
    * @deprecated Use {@link #getMemoryForMapTask()} and
    *             {@link #getMemoryForReduceTask()}
@@ -1726,24 +1766,16 @@ public class JobConf extends Configuration {
       "getMaxVirtualMemoryForTask() is deprecated. " +
       "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
 
-    if (get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
-      if (get(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY) != null || get(
-        JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY) != null) {
-        long val = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
-        if (val == JobConf.DISABLED_MEMORY_LIMIT) {
-          return val;
-        } else {
-          if (val < 0) {
-            return JobConf.DISABLED_MEMORY_LIMIT;
-          }
-          return val * 1024 * 1024;
-          //Convert MB to byte as new value is in
-          // MB and old deprecated method returns bytes
-        }
+    long value = getLong(MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+    value = normalizeMemoryConfigValue(value);
+    if (value == DISABLED_MEMORY_LIMIT) {
+      value = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
+      value = normalizeMemoryConfigValue(value);
+      if (value != DISABLED_MEMORY_LIMIT) {
+        value *= 1024*1024;
       }
     }
-
-    return getLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+    return value;
   }
 
   /**

+ 41 - 3
src/test/org/apache/hadoop/conf/TestJobConf.java

@@ -70,9 +70,9 @@ public class TestJobConf extends TestCase {
     configuration = new JobConf();
     configuration.set("mapred.task.maxvmem" , "-1");
     configuration.set("mapred.job.map.memory.mb",String.valueOf(300));
-    configuration.set("mapred.job.reduce.memory.mb",String.valueOf(300));
-    Assert.assertEquals(configuration.getMemoryForMapTask(),-1);
-    Assert.assertEquals(configuration.getMemoryForReduceTask(),-1);
+    configuration.set("mapred.job.reduce.memory.mb",String.valueOf(400));
+    Assert.assertEquals(configuration.getMemoryForMapTask(), 300);
+    Assert.assertEquals(configuration.getMemoryForReduceTask(), 400);
 
     configuration = new JobConf();
     configuration.set("mapred.task.maxvmem" , String.valueOf(2*1024 * 1024));
@@ -90,9 +90,47 @@ public class TestJobConf extends TestCase {
 
     configuration = new JobConf();
     configuration.set("mapred.task.maxvmem" , String.valueOf(2*1024 * 1024));
+    configuration.set("mapred.job.map.memory.mb","3");
+    configuration.set("mapred.job.reduce.memory.mb","3");
     Assert.assertEquals(configuration.getMemoryForMapTask(),2);
     Assert.assertEquals(configuration.getMemoryForReduceTask(),2);
   }
+  
+  /**
+   * Test that negative values for MAPRED_TASK_MAXVMEM_PROPERTY cause
+   * new configuration keys' values to be used.
+   */
+  
+  public void testNegativeValueForTaskVmem() {
+    JobConf configuration = new JobConf();
+    
+    configuration.set(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, "-3");
+    configuration.set("mapred.job.map.memory.mb", "4");
+    configuration.set("mapred.job.reduce.memory.mb", "5");
+    Assert.assertEquals(4, configuration.getMemoryForMapTask());
+    Assert.assertEquals(5, configuration.getMemoryForReduceTask());
+    
+  }
+  
+  /**
+   * Test that negative values for all memory configuration properties causes
+   * APIs to disable memory limits
+   */
+  
+  public void testNegativeValuesForMemoryParams() {
+    JobConf configuration = new JobConf();
+    
+    configuration.set(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, "-4");
+    configuration.set("mapred.job.map.memory.mb", "-5");
+    configuration.set("mapred.job.reduce.memory.mb", "-6");
+    
+    Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
+                        configuration.getMemoryForMapTask());
+    Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
+                        configuration.getMemoryForReduceTask());
+    Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
+                        configuration.getMaxVirtualMemoryForTask());
+  }
 
   /**
    *   Test deprecated accessor and mutator method for mapred.task.maxvmem