Explorar o código

MAPREDUCE-40. Keep memory management backwards compatible for job configuration parameters and limits. Contributed by Rahul Kumar Singh.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20@800309 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala %!s(int64=16) %!d(string=hai) anos
pai
achega
e4cee4b6fa

+ 3 - 0
CHANGES.txt

@@ -193,6 +193,9 @@ Release 0.20.1 - Unreleased
     MAPREDUCE-383. Fix a bug in Pipes combiner due to bytes count not 
     getting reset after the spill. (Christian Kunz via sharad)
 
+    MAPREDUCE-40. Keep memory management backwards compatible for job 
+    configuration parameters and limits. (Rahul Kumar Singh via yhemanth)
+   
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

+ 9 - 3
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java

@@ -49,13 +49,16 @@ class CapacitySchedulerConf {
    * {@link JobConf#setMaxVirtualMemoryForTask()}. This property thus provides
    * default value of physical memory for job's that don't explicitly specify
    * physical memory requirements.
-   * 
+   * <p/>
    * It defaults to {@link JobConf#DISABLED_MEMORY_LIMIT} and if not explicitly
    * set to a valid value, scheduler will not consider physical memory for
    * scheduling even if virtual memory based scheduling is enabled.
+   *
+   * @deprecated
    */
+  @Deprecated
   static String DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY =
-      "mapred.capacity-scheduler.task.default-pmem-percentage-in-vmem";
+    "mapred.capacity-scheduler.task.default-pmem-percentage-in-vmem";
 
   /**
    * Configuration that provides an upper limit on the maximum physical memory
@@ -65,9 +68,12 @@ class CapacitySchedulerConf {
    * by the scheduler. If it is set to {@link JobConf#DISABLED_MEMORY_LIMIT},
    * scheduler will not consider physical memory for scheduling even if virtual
    * memory based scheduling is enabled.
+   *
+   * @deprecated
    */
+  @Deprecated
   static final String UPPER_LIMIT_ON_TASK_PMEM_PROPERTY =
-      "mapred.capacity-scheduler.task.limit.maxpmem";
+    "mapred.capacity-scheduler.task.limit.maxpmem";
 
   /**
    * The constant which defines the default initialization thread

+ 53 - 5
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -770,6 +770,29 @@ class CapacityTaskScheduler extends TaskScheduler {
   }
 
   private void initializeMemoryRelatedConf() {
+    //handling @deprecated
+    if (conf.get(
+      CapacitySchedulerConf.DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY) !=
+      null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          CapacitySchedulerConf.DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY));
+    }
+
+    //handling @deprecated
+    if (conf.get(CapacitySchedulerConf.UPPER_LIMIT_ON_TASK_PMEM_PROPERTY) !=
+      null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          CapacitySchedulerConf.UPPER_LIMIT_ON_TASK_PMEM_PROPERTY));
+    }
+
+    if (conf.get(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY) != null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY));
+    }
+
     memSizeForMapSlotOnJT =
         JobConf.normalizeMemoryConfigValue(conf.getLong(
             JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
@@ -778,14 +801,39 @@ class CapacityTaskScheduler extends TaskScheduler {
         JobConf.normalizeMemoryConfigValue(conf.getLong(
             JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
             JobConf.DISABLED_MEMORY_LIMIT));
-    limitMaxMemForMapTasks =
-        JobConf.normalizeMemoryConfigValue(conf.getLong(
+
+    //handling @deprecated values
+    if (conf.get(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) != null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY)+
+          " instead use " +JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY+
+          " and " + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY
+      );
+      
+      limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
+        JobConf.normalizeMemoryConfigValue(
+          conf.getLong(
+            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+      if (limitMaxMemForMapTasks != JobConf.DISABLED_MEMORY_LIMIT &&
+        limitMaxMemForMapTasks >= 0) {
+        limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
+          limitMaxMemForMapTasks /
+            (1024 * 1024); //Converting old values in bytes to MB
+      }
+    } else {
+      limitMaxMemForMapTasks =
+        JobConf.normalizeMemoryConfigValue(
+          conf.getLong(
             JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
             JobConf.DISABLED_MEMORY_LIMIT));
-    limitMaxMemForReduceTasks =
-        JobConf.normalizeMemoryConfigValue(conf.getLong(
+      limitMaxMemForReduceTasks =
+        JobConf.normalizeMemoryConfigValue(
+          conf.getLong(
             JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
             JobConf.DISABLED_MEMORY_LIMIT));
+    }
     LOG.info(String.format("Scheduler configured with "
         + "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT, "
         + "limitMaxMemForMapTasks, limitMaxMemForReduceTasks)"
@@ -848,7 +896,7 @@ class CapacityTaskScheduler extends TaskScheduler {
       }else {
         totalCapacity += capacity;
       }
-      int ulMin = schedConf.getMinimumUserLimitPercent(queueName); 
+      int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
       // create our QSI and add to our hashmap
       QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, capacity, 
                                                     ulMin, jobQueuesManager);

+ 19 - 1
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -45,7 +45,7 @@ public class TestCapacityScheduler extends TestCase {
       LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
 
   private static int jobCounter;
-  
+
   /**
    * Test class that removes the asynchronous nature of job initialization.
    * 
@@ -2789,4 +2789,22 @@ public class TestCapacityScheduler extends TestCase {
       i++;
     }
   }
+
+  public void testDeprecatedMemoryValues() throws IOException {
+    // 2 map and 1 reduce slots
+    taskTrackerManager.addQueues(new String[] { "default" });
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));  
+    resConf.setFakeQueues(queues);
+    JobConf conf = (JobConf)(scheduler.getConf());
+    conf.set(
+      JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, String.valueOf(
+        1024 * 1024 * 3));
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    scheduler.setResourceManagerConf(resConf);    
+    scheduler.start();
+
+    assertEquals(scheduler.getLimitMaxMemForMapSlot(),3);
+    assertEquals(scheduler.getLimitMaxMemForReduceSlot(),3);
+  }
 }

+ 0 - 70
src/mapred/mapred-default.xml

@@ -165,76 +165,6 @@
   </description>
 </property>
 
-<property>
-  <name>mapred.tasktracker.vmem.reserved</name>
-  <value>-1</value>
-  <description>Configuration property to specify the amount of virtual memory
-    that has to be reserved by the TaskTracker for system usage (OS, TT etc).
-    The reserved virtual memory should be a part of the total virtual memory
-    available on the TaskTracker.
-    
-    The reserved virtual memory and the total virtual memory values are
-    reported by the TaskTracker as part of heart-beat so that they can
-    considered by a scheduler. Please refer to the documentation of the
-    configured scheduler to see how this property is used.
-    
-    These two values are also used by a TaskTracker for tracking tasks' memory
-    usage. Memory management functionality on a TaskTracker is disabled if this
-    property is set to -1, if it more than the total virtual memory on the 
-    tasktracker, or if either of the values is negative.
-  </description>
-</property>
-
-<property>
-  <name>mapred.task.limit.maxvmem</name>
-  <value>-1</value>
-  <description>
-    Cluster-wide configuration in bytes to be set by the site administrators
-    that provides an upper limit on the maximum virtual memory that can be
-    specified by a job via mapred.task.maxvmem. This has to be set on both the
-    JobTracker node for the sake of scheduling decisions and on the TaskTracker
-    nodes for the sake of memory management.
-    
-    The job configuration mapred.task.maxvmem should not be more than this
-    value, otherwise depending on the scheduler being configured, the job may
-    be rejected or the job configuration may just be ignored. Please refer to
-    the documentation of the configured scheduler to see how this property is
-    used.
-
-    If it is not set a TaskTracker, TaskTracker's memory management will be
-    disabled.
-  </description>
-</property>
-
-<property>
-  <name>mapred.task.maxvmem</name>
-  <value>-1</value>
-  <description>
-    The maximum amount of virtual memory any task of a job will use, in bytes.
-
-    This value will be used by TaskTrackers for monitoring the memory usage of
-    tasks of this jobs. If a TaskTracker's memory management functionality is
-    enabled, each task of this job will be allowed to use a maximum virtual
-    memory specified by this property. If the task's memory usage goes over 
-    this value, the task will be failed by the TT. If not set, the
-    cluster-wide configuration mapred.task.default.maxvmem is used as the
-    default value for memory requirements. If this property cascaded with
-    mapred.task.default.maxvmem becomes equal to -1, the job's tasks will
-    not be assured any particular amount of virtual memory and may be killed by
-    a TT that intends to control the total memory usage of the tasks via memory
-    management functionality. If the memory management functionality is
-    disabled on a TT, this value is ignored.
-
-    This value should not be more than the cluster-wide configuration
-    mapred.task.limit.maxvmem.
-
-    This value may be used by schedulers that support scheduling based on job's
-    memory requirements. Please refer to the documentation of the scheduler
-    being configured to see if it does memory based scheduling and if it does,
-    how this property is used by that scheduler.
-  </description>
-</property>
-
 <property>
   <name>mapred.tasktracker.memory_calculator_plugin</name>
   <value></value>

+ 168 - 9
src/mapred/org/apache/hadoop/mapred/JobConf.java

@@ -111,6 +111,34 @@ public class JobConf extends Configuration {
     Configuration.addDefaultResource("mapred-site.xml");
   }
 
+  /**
+   * @deprecated
+   */
+  @Deprecated
+  public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
+    "mapred.task.maxvmem";
+
+  /**
+   * @deprecated 
+   */
+  @Deprecated
+  public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
+    "mapred.task.limit.maxvmem";
+
+  /**
+   * @deprecated
+   */
+  @Deprecated
+  public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
+    "mapred.task.default.maxvmem";
+
+  /**
+   * @deprecated
+   */
+  @Deprecated
+  public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
+    "mapred.task.maxpmem";
+
   /**
    * A value which if set for memory related configuration options,
    * indicates that the options are turned off.
@@ -1367,7 +1395,8 @@ public class JobConf extends Configuration {
    * 
    * @param uri the job end notification uri
    * @see JobStatus
-   * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#JobCompletionAndChaining">Job Completion and Chaining</a>
+   * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#
+   *       JobCompletionAndChaining">Job Completion and Chaining</a>
    */
   public void setJobEndNotificationURI(String uri) {
     set("job.end.notification.url", uri);
@@ -1392,21 +1421,46 @@ public class JobConf extends Configuration {
     return get("job.local.dir");
   }
 
-  long getMemoryForMapTask() {
-    return getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
-        DISABLED_MEMORY_LIMIT);
+  public long getMemoryForMapTask() {
+    if (get(MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)+
+          " instead use  "+JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY + " and "
+          + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
+
+      long val = getLong(
+        MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+      return (val == DISABLED_MEMORY_LIMIT) ? val :
+        ((val < 0) ? DISABLED_MEMORY_LIMIT : val / (1024 * 1024));
+    }
+    return getLong(
+      JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
+      DISABLED_MEMORY_LIMIT);
   }
 
-  void setMemoryForMapTask(long mem) {
+  public void setMemoryForMapTask(long mem) {
     setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
   }
 
-  long getMemoryForReduceTask() {
-    return getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
-        DISABLED_MEMORY_LIMIT);
+  public long getMemoryForReduceTask() {
+    if (get(MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)+
+        " instead use  "+JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY + " and "
+        + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
+      long val = getLong(
+        MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+      return (val == DISABLED_MEMORY_LIMIT) ? val :
+        ((val < 0) ? DISABLED_MEMORY_LIMIT : val / (1024 * 1024));
+    }
+    return getLong(
+      JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
+      DISABLED_MEMORY_LIMIT);
   }
 
-  void setMemoryForReduceTask(long mem) {
+  public void setMemoryForReduceTask(long mem) {
     setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
   }
 
@@ -1472,5 +1526,110 @@ public class JobConf extends Configuration {
     }
     return null;
   }
+
+
+  /**
+   * The maximum amount of memory any task of this job will use. See
+   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
+   * <p/>
+   * mapred.task.maxvmem is split into
+   * mapred.job.map.memory.mb
+   * and mapred.job.map.memory.mb,mapred
+   * each of the new key are set
+   * as mapred.task.maxvmem / 1024
+   * as new values are in MB
+   *
+   * @return The maximum amount of memory any task of this job will use, in
+   *         bytes.
+   * @see #setMaxVirtualMemoryForTask(long)
+   * @deprecated Use {@link #getMemoryForMapTask()} and
+   *             {@link #getMemoryForReduceTask()}
+   */
+  @Deprecated
+  public long getMaxVirtualMemoryForTask() {
+    LOG.warn(
+      "getMaxVirtualMemoryForTask() is deprecated. " +
+      "Instead use getMemoryForMapTask() and getMemoryForReduceTask()");
+
+    if (get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
+      if (get(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY) != null || get(
+        JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY) != null) {
+        long val = Math.max(getMemoryForMapTask(), getMemoryForReduceTask());
+        if (val == JobConf.DISABLED_MEMORY_LIMIT) {
+          return val;
+        } else {
+          if (val < 0) {
+            return JobConf.DISABLED_MEMORY_LIMIT;
+          }
+          return val * 1024 * 1024;
+          //Convert MB to byte as new value is in
+          // MB and old deprecated method returns bytes
+        }
+      }
+    }
+
+    return getLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+  }
+
+  /**
+   * Set the maximum amount of memory any task of this job can use. See
+   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
+   * <p/>
+   * mapred.task.maxvmem is split into
+   * mapred.job.map.memory.mb
+   * and mapred.job.map.memory.mb,mapred
+   * each of the new key are set
+   * as mapred.task.maxvmem / 1024
+   * as new values are in MB
+   *
+   * @param vmem Maximum amount of virtual memory in bytes any task of this job
+   *             can use.
+   * @see #getMaxVirtualMemoryForTask()
+   * @deprecated
+   *  Use {@link #setMemoryForMapTask(long mem)}  and
+   *  Use {@link #setMemoryForReduceTask(long mem)}
+   */
+  @Deprecated
+  public void setMaxVirtualMemoryForTask(long vmem) {
+    LOG.warn("setMaxVirtualMemoryForTask() is deprecated."+
+      "Instead use setMemoryForMapTask() and setMemoryForReduceTask()");
+    if(vmem != DISABLED_MEMORY_LIMIT && vmem < 0) {
+      setMemoryForMapTask(DISABLED_MEMORY_LIMIT);
+      setMemoryForReduceTask(DISABLED_MEMORY_LIMIT);
+    }
+
+    if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) == null) {
+      setMemoryForMapTask(vmem / (1024 * 1024)); //Changing bytes to mb
+      setMemoryForReduceTask(vmem / (1024 * 1024));//Changing bytes to mb
+    }else{
+      this.setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY,vmem);
+    }
+  }
+
+  /**
+   * @deprecated this variable is deprecated and nolonger in use.
+   */
+  @Deprecated
+  public long getMaxPhysicalMemoryForTask() {
+    LOG.warn("The API getMaxPhysicalMemoryForTask() is deprecated."
+              + " Refer to the APIs getMemoryForMapTask() and"
+              + " getMemoryForReduceTask() for details.");
+    return -1;
+  }
+
+  /*
+   * @deprecated this
+   */
+  @Deprecated
+  public void setMaxPhysicalMemoryForTask(long mem) {
+    LOG.warn("The API setMaxPhysicalMemoryForTask() is deprecated."
+        + " The value set is ignored. Refer to "
+        + " setMemoryForMapTask() and setMemoryForReduceTask() for details.");
+  }
+
+  static String deprecatedString(String key) {
+    return "The variable " + key + " is no longer used";
+  }
+
 }
 

+ 29 - 4
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -3681,14 +3681,39 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         JobConf.normalizeMemoryConfigValue(conf.getLong(
             JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
             JobConf.DISABLED_MEMORY_LIMIT));
-    limitMaxMemForMapTasks =
-        JobConf.normalizeMemoryConfigValue(conf.getLong(
+
+    if (conf.get(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) != null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY)+
+          " instead use "+JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY+
+          " and " + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY
+      );
+
+      limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
+        JobConf.normalizeMemoryConfigValue(
+          conf.getLong(
+            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+      if (limitMaxMemForMapTasks != JobConf.DISABLED_MEMORY_LIMIT &&
+        limitMaxMemForMapTasks >= 0) {
+        limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
+          limitMaxMemForMapTasks /
+            (1024 * 1024); //Converting old values in bytes to MB
+      }
+    } else {
+      limitMaxMemForMapTasks =
+        JobConf.normalizeMemoryConfigValue(
+          conf.getLong(
             JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
             JobConf.DISABLED_MEMORY_LIMIT));
-    limitMaxMemForReduceTasks =
-        JobConf.normalizeMemoryConfigValue(conf.getLong(
+      limitMaxMemForReduceTasks =
+        JobConf.normalizeMemoryConfigValue(
+          conf.getLong(
             JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
             JobConf.DISABLED_MEMORY_LIMIT));
+    }
+
     LOG.info(new StringBuilder().append("Scheduler configured with ").append(
         "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,").append(
         " limitMaxMemForMapTasks, limitMaxMemForReduceTasks) (").append(

+ 48 - 4
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -96,6 +96,19 @@ import org.apache.hadoop.util.Shell.ShellCommandExecutor;
  *******************************************************/
 public class TaskTracker 
              implements MRConstants, TaskUmbilicalProtocol, Runnable {
+  /**
+   * @deprecated
+   */
+  @Deprecated
+  static final String MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY =
+    "mapred.tasktracker.vmem.reserved";
+  /**
+   * @deprecated
+   */
+  @Deprecated
+  static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
+     "mapred.tasktracker.pmem.reserved";
+ 
   static final long WAIT_FOR_DONE = 3 * 1000;
   private int httpPort;
 
@@ -521,13 +534,15 @@ public class TaskTracker
     mapLauncher.start();
     reduceLauncher.start();
   }
-  
-  public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
+
+  public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
+    Configuration conf) {
     return conf.getClass("mapred.tasktracker.instrumentation",
         TaskTrackerMetricsInst.class, TaskTrackerInstrumentation.class);
   }
-  
-  public static void setInstrumentationClass(Configuration conf, Class<? extends TaskTrackerInstrumentation> t) {
+
+  public static void setInstrumentationClass(
+    Configuration conf, Class<? extends TaskTrackerInstrumentation> t) {
     conf.setClass("mapred.tasktracker.instrumentation",
         t, TaskTrackerInstrumentation.class);
   }
@@ -3002,6 +3017,35 @@ public class TaskTracker
    * Memory-related setup
    */
   private void initializeMemoryManagement() {
+
+    //handling @deprecated
+    if (fConf.get(MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY) != null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY));
+    }
+
+    //handling @deprecated
+    if (fConf.get(MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY) != null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY));
+    }
+
+    //handling @deprecated
+    if (fConf.get(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY) != null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY));
+    }
+
+    //handling @deprecated
+    if (fConf.get(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) != null) {
+      LOG.warn(
+        JobConf.deprecatedString(
+          JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY));
+    }
+
     Class<? extends MemoryCalculatorPlugin> clazz =
         fConf.getClass(MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
             null, MemoryCalculatorPlugin.class);

+ 89 - 0
src/test/org/apache/hadoop/conf/TestJobConf.java

@@ -49,4 +49,93 @@ public class TestJobConf extends TestCase {
     configuration.set("mapred.task.profile.params", "test");
     Assert.assertEquals("test", configuration.getProfileParams());
   }
+
+  /**
+   * Testing mapred.task.maxvmem replacement with new values
+   *
+   */
+  public void testMemoryConfigForMapOrReduceTask(){
+    JobConf configuration = new JobConf();
+    configuration.set("mapred.job.map.memory.mb",String.valueOf(300));
+    configuration.set("mapred.job.reduce.memory.mb",String.valueOf(300));
+    Assert.assertEquals(configuration.getMemoryForMapTask(),300);
+    Assert.assertEquals(configuration.getMemoryForReduceTask(),300);
+
+    configuration.set("mapred.task.maxvmem" , String.valueOf(2*1024 * 1024));
+    configuration.set("mapred.job.map.memory.mb",String.valueOf(300));
+    configuration.set("mapred.job.reduce.memory.mb",String.valueOf(300));
+    Assert.assertEquals(configuration.getMemoryForMapTask(),2);
+    Assert.assertEquals(configuration.getMemoryForReduceTask(),2);
+
+    configuration = new JobConf();
+    configuration.set("mapred.task.maxvmem" , "-1");
+    configuration.set("mapred.job.map.memory.mb",String.valueOf(300));
+    configuration.set("mapred.job.reduce.memory.mb",String.valueOf(300));
+    Assert.assertEquals(configuration.getMemoryForMapTask(),-1);
+    Assert.assertEquals(configuration.getMemoryForReduceTask(),-1);
+
+    configuration = new JobConf();
+    configuration.set("mapred.task.maxvmem" , String.valueOf(2*1024 * 1024));
+    configuration.set("mapred.job.map.memory.mb","-1");
+    configuration.set("mapred.job.reduce.memory.mb","-1");
+    Assert.assertEquals(configuration.getMemoryForMapTask(),2);
+    Assert.assertEquals(configuration.getMemoryForReduceTask(),2);
+
+    configuration = new JobConf();
+    configuration.set("mapred.task.maxvmem" , String.valueOf(-1));
+    configuration.set("mapred.job.map.memory.mb","-1");
+    configuration.set("mapred.job.reduce.memory.mb","-1");
+    Assert.assertEquals(configuration.getMemoryForMapTask(),-1);
+    Assert.assertEquals(configuration.getMemoryForReduceTask(),-1);    
+
+    configuration = new JobConf();
+    configuration.set("mapred.task.maxvmem" , String.valueOf(2*1024 * 1024));
+    Assert.assertEquals(configuration.getMemoryForMapTask(),2);
+    Assert.assertEquals(configuration.getMemoryForReduceTask(),2);
+  }
+
+  /**
+   *   Test deprecated accessor and mutator method for mapred.task.maxvmem
+   */
+  public void testMaxVirtualMemoryForTask() {
+    JobConf configuration = new JobConf();
+
+    //get test case
+    configuration.set("mapred.job.map.memory.mb", String.valueOf(300));
+    configuration.set("mapred.job.reduce.memory.mb", String.valueOf(-1));
+    Assert.assertEquals(
+      configuration.getMaxVirtualMemoryForTask(), 300 * 1024 * 1024);
+
+    configuration = new JobConf();
+    configuration.set("mapred.job.map.memory.mb", String.valueOf(-1));
+    configuration.set("mapred.job.reduce.memory.mb", String.valueOf(200));
+    Assert.assertEquals(
+      configuration.getMaxVirtualMemoryForTask(), 200 * 1024 * 1024);
+
+    configuration = new JobConf();
+    configuration.set("mapred.job.map.memory.mb", String.valueOf(-1));
+    configuration.set("mapred.job.reduce.memory.mb", String.valueOf(-1));
+    configuration.set("mapred.task.maxvmem", String.valueOf(1 * 1024 * 1024));
+    Assert.assertEquals(
+      configuration.getMaxVirtualMemoryForTask(), 1 * 1024 * 1024);
+
+    configuration = new JobConf();
+    configuration.set("mapred.task.maxvmem", String.valueOf(1 * 1024 * 1024));
+    Assert.assertEquals(
+      configuration.getMaxVirtualMemoryForTask(), 1 * 1024 * 1024);
+
+    //set test case
+
+    configuration = new JobConf();
+    configuration.setMaxVirtualMemoryForTask(2 * 1024 * 1024);
+    Assert.assertEquals(configuration.getMemoryForMapTask(), 2);
+    Assert.assertEquals(configuration.getMemoryForReduceTask(), 2);
+
+    configuration = new JobConf();   
+    configuration.set("mapred.job.map.memory.mb", String.valueOf(300));
+    configuration.set("mapred.job.reduce.memory.mb", String.valueOf(400));
+    configuration.setMaxVirtualMemoryForTask(2 * 1024 * 1024);
+    Assert.assertEquals(configuration.getMemoryForMapTask(), 2);
+    Assert.assertEquals(configuration.getMemoryForReduceTask(), 2);
+  }
 }