瀏覽代碼

MAPREDUCE-5785. Derive heap size or mapreduce.*.memory.mb automatically. (Gera Shegalov and Karthik Kambatla via gera)

Gera Shegalov 10 年之前
父節點
當前提交
a003f71cac

+ 2 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -3,6 +3,8 @@ Hadoop MapReduce Change Log
 Trunk (Unreleased)
 Trunk (Unreleased)
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES
+    MAPREDUCE-5785. Derive heap size or mapreduce.*.memory.mb automatically.
+    (Gera Shegalov and Karthik Kambatla via gera)
 
 
   NEW FEATURES
   NEW FEATURES
 
 

+ 2 - 30
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java

@@ -27,6 +27,7 @@ import java.util.Vector;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -99,36 +100,7 @@ public class MapReduceChildJVM {
   }
   }
 
 
   private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {
   private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {
-    String userClasspath = "";
-    String adminClasspath = "";
-    if (isMapTask) {
-      userClasspath = 
-          jobConf.get(
-              JobConf.MAPRED_MAP_TASK_JAVA_OPTS, 
-              jobConf.get(
-                  JobConf.MAPRED_TASK_JAVA_OPTS, 
-                  JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)
-          );
-      adminClasspath = 
-          jobConf.get(
-              MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
-              MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
-    } else {
-      userClasspath =
-          jobConf.get(
-              JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, 
-              jobConf.get(
-                  JobConf.MAPRED_TASK_JAVA_OPTS,
-                  JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)
-              );
-      adminClasspath =
-          jobConf.get(
-              MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
-              MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
-    }
-    
-    // Add admin classpath first so it can be overridden by user.
-    return adminClasspath + " " + userClasspath;
+    return jobConf.getTaskJavaOpts(isMapTask ? TaskType.MAP : TaskType.REDUCE);
   }
   }
 
 
   public static List<String> getVMCommand(
   public static List<String> getVMCommand(

+ 2 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -563,19 +563,8 @@ public abstract class TaskAttemptImpl implements
     stateMachine = stateMachineFactory.make(this);
     stateMachine = stateMachineFactory.make(this);
   }
   }
 
 
-  private int getMemoryRequired(Configuration conf, TaskType taskType) {
-    int memory = 1024;
-    if (taskType == TaskType.MAP)  {
-      memory =
-          conf.getInt(MRJobConfig.MAP_MEMORY_MB,
-              MRJobConfig.DEFAULT_MAP_MEMORY_MB);
-    } else if (taskType == TaskType.REDUCE) {
-      memory =
-          conf.getInt(MRJobConfig.REDUCE_MEMORY_MB,
-              MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
-    }
-    
-    return memory;
+  private int getMemoryRequired(JobConf conf, TaskType taskType) {
+    return conf.getMemoryRequired(TypeConverter.fromYarn(taskType));
   }
   }
 
 
   private int getCpuRequired(Configuration conf, TaskType taskType) {
   private int getCpuRequired(Configuration conf, TaskType taskType) {

+ 82 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java

@@ -18,8 +18,10 @@
 
 
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 
+import java.util.ArrayList;
 import java.util.Map;
 import java.util.Map;
 
 
+import org.apache.hadoop.mapreduce.TaskType;
 import org.junit.Assert;
 import org.junit.Assert;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
@@ -56,8 +58,8 @@ public class TestMapReduceChildJVM {
     Assert.assertEquals(
     Assert.assertEquals(
       "[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" +
       "[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" +
       " -Djava.net.preferIPv4Stack=true" +
       " -Djava.net.preferIPv4Stack=true" +
-      " -Dhadoop.metrics.log.level=WARN" +
-      "  -Xmx200m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" +
+      " -Dhadoop.metrics.log.level=WARN " +
+      "  -Xmx820m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" +
       " -Dlog4j.configuration=container-log4j.properties" +
       " -Dlog4j.configuration=container-log4j.properties" +
       " -Dyarn.app.container.log.dir=<LOG_DIR>" +
       " -Dyarn.app.container.log.dir=<LOG_DIR>" +
       " -Dyarn.app.container.log.filesize=0" +
       " -Dyarn.app.container.log.filesize=0" +
@@ -67,7 +69,7 @@ public class TestMapReduceChildJVM {
       " attempt_0_0000_m_000000_0" +
       " attempt_0_0000_m_000000_0" +
       " 0" +
       " 0" +
       " 1><LOG_DIR>/stdout" +
       " 1><LOG_DIR>/stdout" +
-      " 2><LOG_DIR>/stderr ]", app.myCommandLine);
+      " 2><LOG_DIR>/stderr ]", app.launchCmdList.get(0));
     
     
     Assert.assertTrue("HADOOP_ROOT_LOGGER not set for job",
     Assert.assertTrue("HADOOP_ROOT_LOGGER not set for job",
       app.cmdEnvironment.containsKey("HADOOP_ROOT_LOGGER"));
       app.cmdEnvironment.containsKey("HADOOP_ROOT_LOGGER"));
@@ -119,8 +121,8 @@ public class TestMapReduceChildJVM {
     Assert.assertEquals(
     Assert.assertEquals(
         "[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" +
         "[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" +
             " -Djava.net.preferIPv4Stack=true" +
             " -Djava.net.preferIPv4Stack=true" +
-            " -Dhadoop.metrics.log.level=WARN" +
-            "  -Xmx200m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" +
+            " -Dhadoop.metrics.log.level=WARN " +
+            "  -Xmx820m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" +
             " -Dlog4j.configuration=container-log4j.properties" +
             " -Dlog4j.configuration=container-log4j.properties" +
             " -Dyarn.app.container.log.dir=<LOG_DIR>" +
             " -Dyarn.app.container.log.dir=<LOG_DIR>" +
             " -Dyarn.app.container.log.filesize=0" +
             " -Dyarn.app.container.log.filesize=0" +
@@ -134,7 +136,7 @@ public class TestMapReduceChildJVM {
             " attempt_0_0000_r_000000_0" +
             " attempt_0_0000_r_000000_0" +
             " 0" +
             " 0" +
             " 1><LOG_DIR>/stdout" +
             " 1><LOG_DIR>/stdout" +
-            " 2><LOG_DIR>/stderr ]", app.myCommandLine);
+            " 2><LOG_DIR>/stderr ]", app.launchCmdList.get(0));
 
 
     Assert.assertTrue("HADOOP_ROOT_LOGGER not set for job",
     Assert.assertTrue("HADOOP_ROOT_LOGGER not set for job",
         app.cmdEnvironment.containsKey("HADOOP_ROOT_LOGGER"));
         app.cmdEnvironment.containsKey("HADOOP_ROOT_LOGGER"));
@@ -161,8 +163,8 @@ public class TestMapReduceChildJVM {
     Assert.assertEquals(
     Assert.assertEquals(
       "[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" +
       "[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" +
       " -Djava.net.preferIPv4Stack=true" +
       " -Djava.net.preferIPv4Stack=true" +
-      " -Dhadoop.metrics.log.level=WARN" +
-      "  -Xmx200m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" +
+      " -Dhadoop.metrics.log.level=WARN " +
+      "  -Xmx820m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" +
       " -Dlog4j.configuration=" + testLogPropertieFile +
       " -Dlog4j.configuration=" + testLogPropertieFile +
       " -Dyarn.app.container.log.dir=<LOG_DIR>" +
       " -Dyarn.app.container.log.dir=<LOG_DIR>" +
       " -Dyarn.app.container.log.filesize=0" +
       " -Dyarn.app.container.log.filesize=0" +
@@ -172,12 +174,81 @@ public class TestMapReduceChildJVM {
       " attempt_0_0000_m_000000_0" +
       " attempt_0_0000_m_000000_0" +
       " 0" +
       " 0" +
       " 1><LOG_DIR>/stdout" +
       " 1><LOG_DIR>/stdout" +
-      " 2><LOG_DIR>/stderr ]", app.myCommandLine);
+      " 2><LOG_DIR>/stderr ]", app.launchCmdList.get(0));
+  }
+
+  @Test
+  public void testAutoHeapSizes() throws Exception {
+    // Don't specify heap size or memory-mb
+    testAutoHeapSize(-1, -1, null);
+
+    // Don't specify heap size
+    testAutoHeapSize(512, 768, null);
+    testAutoHeapSize(100, 768, null);
+    testAutoHeapSize(512, 100, null);
+    // Specify heap size
+    testAutoHeapSize(512, 768, "-Xmx100m");
+    testAutoHeapSize(512, 768, "-Xmx500m");
+
+    // Specify heap size but not the memory
+    testAutoHeapSize(-1, -1, "-Xmx100m");
+    testAutoHeapSize(-1, -1, "-Xmx500m");
+  }
+
+  private void testAutoHeapSize(int mapMb, int redMb, String xmxArg)
+      throws Exception {
+    JobConf conf = new JobConf();
+    float heapRatio = conf.getFloat(MRJobConfig.HEAP_MEMORY_MB_RATIO,
+        MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO);
+
+    // Verify map and reduce java opts are not set by default
+    Assert.assertNull("Default map java opts!",
+        conf.get(MRJobConfig.MAP_JAVA_OPTS));
+    Assert.assertNull("Default reduce java opts!",
+        conf.get(MRJobConfig.REDUCE_JAVA_OPTS));
+    // Set the memory-mbs and java-opts
+    if (mapMb > 0) {
+      conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMb);
+    } else {
+      mapMb = conf.getMemoryRequired(TaskType.MAP);
+    }
+
+    if (redMb > 0) {
+      conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, redMb);
+    } else {
+      redMb = conf.getMemoryRequired(TaskType.REDUCE);
+    }
+    if (xmxArg != null) {
+      conf.set(MRJobConfig.MAP_JAVA_OPTS, xmxArg);
+      conf.set(MRJobConfig.REDUCE_JAVA_OPTS, xmxArg);
+    }
+
+    // Submit job to let unspecified fields be picked up
+    MyMRApp app = new MyMRApp(1, 1, true, this.getClass().getName(), true);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+
+    // Go through the tasks and verify the values are as expected
+    for (String cmd : app.launchCmdList) {
+      final boolean isMap = cmd.contains("_m_");
+      int heapMb;
+      if (xmxArg == null) {
+        heapMb = (int)(Math.ceil((isMap ? mapMb : redMb) * heapRatio));
+      } else {
+        final String javaOpts = conf.get(isMap
+            ? MRJobConfig.MAP_JAVA_OPTS
+            : MRJobConfig.REDUCE_JAVA_OPTS);
+        heapMb = JobConf.parseMaximumHeapSizeMB(javaOpts);
+      }
+      Assert.assertEquals("Incorrect heapsize in the command opts",
+          heapMb, JobConf.parseMaximumHeapSizeMB(cmd));
+    }
   }
   }
 
 
   private static final class MyMRApp extends MRApp {
   private static final class MyMRApp extends MRApp {
 
 
-    private String myCommandLine;
+    private ArrayList<String> launchCmdList = new ArrayList<>();
     private Map<String, String> cmdEnvironment;
     private Map<String, String> cmdEnvironment;
 
 
     public MyMRApp(int maps, int reduces, boolean autoComplete,
     public MyMRApp(int maps, int reduces, boolean autoComplete,
@@ -196,7 +267,7 @@ public class TestMapReduceChildJVM {
                 launchEvent.getContainerLaunchContext();
                 launchEvent.getContainerLaunchContext();
             String cmdString = launchContext.getCommands().toString();
             String cmdString = launchContext.getCommands().toString();
             LOG.info("launchContext " + cmdString);
             LOG.info("launchContext " + cmdString);
-            myCommandLine = cmdString;
+            launchCmdList.add(cmdString);
             cmdEnvironment = launchContext.getEnvironment();
             cmdEnvironment = launchContext.getEnvironment();
           }
           }
           super.handle(event);
           super.handle(event);

+ 125 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java

@@ -20,8 +20,10 @@ package org.apache.hadoop.mapred;
 
 
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -44,6 +46,7 @@ import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
@@ -114,6 +117,8 @@ import org.apache.log4j.Level;
 public class JobConf extends Configuration {
 public class JobConf extends Configuration {
 
 
   private static final Log LOG = LogFactory.getLog(JobConf.class);
   private static final Log LOG = LogFactory.getLog(JobConf.class);
+  private static final Pattern JAVA_OPTS_XMX_PATTERN =
+          Pattern.compile(".*(?:^|\\s)-Xmx(\\d+)([gGmMkK]?)(?:$|\\s).*");
 
 
   static{
   static{
     ConfigUtil.loadResources();
     ConfigUtil.loadResources();
@@ -247,9 +252,9 @@ public class JobConf extends Configuration {
    */
    */
   public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
   public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
     JobContext.REDUCE_JAVA_OPTS;
     JobContext.REDUCE_JAVA_OPTS;
-  
-  public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
-  
+
+  public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "";
+
   /**
   /**
    * @deprecated
    * @deprecated
    * Configuration key to set the maximum virtual memory available to the child
    * Configuration key to set the maximum virtual memory available to the child
@@ -2022,7 +2027,123 @@ public class JobConf extends Configuration {
       LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
       LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
     }
     }
   }
   }
-  
+
+  private String getConfiguredTaskJavaOpts(TaskType taskType) {
+    String userClasspath = "";
+    String adminClasspath = "";
+    if (taskType == TaskType.MAP) {
+      userClasspath = get(MAPRED_MAP_TASK_JAVA_OPTS,
+          get(MAPRED_TASK_JAVA_OPTS, DEFAULT_MAPRED_TASK_JAVA_OPTS));
+      adminClasspath = get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
+          MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
+    } else {
+      userClasspath = get(MAPRED_REDUCE_TASK_JAVA_OPTS,
+          get(MAPRED_TASK_JAVA_OPTS, DEFAULT_MAPRED_TASK_JAVA_OPTS));
+      adminClasspath = get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
+          MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
+    }
+
+    return adminClasspath + " " + userClasspath;
+  }
+
+  @Private
+  public String getTaskJavaOpts(TaskType taskType) {
+    String javaOpts = getConfiguredTaskJavaOpts(taskType);
+
+    if (!javaOpts.contains("-Xmx")) {
+      float heapRatio = getFloat(MRJobConfig.HEAP_MEMORY_MB_RATIO,
+          MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO);
+
+      if (heapRatio > 1.0f || heapRatio < 0) {
+        LOG.warn("Invalid value for " + MRJobConfig.HEAP_MEMORY_MB_RATIO
+            + ", using the default.");
+        heapRatio = MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO;
+      }
+
+      int taskContainerMb = getMemoryRequired(taskType);
+      int taskHeapSize = (int)Math.ceil(taskContainerMb * heapRatio);
+
+      String xmxArg = String.format("-Xmx%dm", taskHeapSize);
+      LOG.info("Task java-opts do not specify heap size. Setting task attempt" +
+          " jvm max heap size to " + xmxArg);
+
+      javaOpts += " " + xmxArg;
+    }
+
+    return javaOpts;
+  }
+
+  /**
+   * Parse the Maximum heap size from the java opts as specified by the -Xmx option
+   * Format: -Xmx<size>[g|G|m|M|k|K]
+   * @param javaOpts String to parse to read maximum heap size
+   * @return Maximum heap size in MB or -1 if not specified
+   */
+  @Private
+  @VisibleForTesting
+  public static int parseMaximumHeapSizeMB(String javaOpts) {
+    // Find the last matching -Xmx following word boundaries
+    Matcher m = JAVA_OPTS_XMX_PATTERN.matcher(javaOpts);
+    if (m.matches()) {
+      int size = Integer.parseInt(m.group(1));
+      if (size <= 0) {
+        return -1;
+      }
+      if (m.group(2).isEmpty()) {
+        // -Xmx specified in bytes
+        return size / (1024 * 1024);
+      }
+      char unit = m.group(2).charAt(0);
+      switch (unit) {
+        case 'g':
+        case 'G':
+          // -Xmx specified in GB
+          return size * 1024;
+        case 'm':
+        case 'M':
+          // -Xmx specified in MB
+          return size;
+        case 'k':
+        case 'K':
+          // -Xmx specified in KB
+          return size / 1024;
+      }
+    }
+    // -Xmx not specified
+    return -1;
+  }
+
+  private int getMemoryRequiredHelper(
+      String configName, int defaultValue, int heapSize, float heapRatio) {
+    int memory = getInt(configName, -1);
+    if (memory <= 0) {
+      if (heapSize > 0) {
+        memory = (int) Math.ceil(heapSize / heapRatio);
+        LOG.info("Figured value for " + configName + " from javaOpts");
+      } else {
+        memory = defaultValue;
+      }
+    }
+
+    return memory;
+  }
+
+  @Private
+  public int getMemoryRequired(TaskType taskType) {
+    int memory = 1024;
+    int heapSize = parseMaximumHeapSizeMB(getConfiguredTaskJavaOpts(taskType));
+    float heapRatio = getFloat(MRJobConfig.HEAP_MEMORY_MB_RATIO,
+        MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO);
+    if (taskType == TaskType.MAP) {
+      return getMemoryRequiredHelper(MRJobConfig.MAP_MEMORY_MB,
+          MRJobConfig.DEFAULT_MAP_MEMORY_MB, heapSize, heapRatio);
+    } else if (taskType == TaskType.REDUCE) {
+      return getMemoryRequiredHelper(MRJobConfig.REDUCE_MEMORY_MB,
+          MRJobConfig.DEFAULT_REDUCE_MEMORY_MB, heapSize, heapRatio);
+    } else {
+      return memory;
+    }
+  }
 
 
 }
 }
 
 

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java

@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 
 import javax.crypto.SecretKey;
 import javax.crypto.SecretKey;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -624,8 +625,9 @@ abstract public class Task implements Writable, Configurable {
      * Using AtomicBoolean since we need an atomic read & reset method. 
      * Using AtomicBoolean since we need an atomic read & reset method. 
      */  
      */  
     private AtomicBoolean progressFlag = new AtomicBoolean(false);
     private AtomicBoolean progressFlag = new AtomicBoolean(false);
-    
-    TaskReporter(Progress taskProgress,
+
+    @VisibleForTesting
+    public TaskReporter(Progress taskProgress,
                  TaskUmbilicalProtocol umbilical) {
                  TaskUmbilicalProtocol umbilical) {
       this.umbilical = umbilical;
       this.umbilical = umbilical;
       this.taskProgress = taskProgress;
       this.taskProgress = taskProgress;

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -810,6 +810,11 @@ public interface MRJobConfig {
   public static final String TASK_PREEMPTION =
   public static final String TASK_PREEMPTION =
       "mapreduce.job.preemption";
       "mapreduce.job.preemption";
 
 
+  public static final String HEAP_MEMORY_MB_RATIO =
+      "mapreduce.job.heap.memory-mb.ratio";
+
+  public static final float DEFAULT_HEAP_MEMORY_MB_RATIO = 0.8f;
+
   public static final String MR_ENCRYPTED_INTERMEDIATE_DATA =
   public static final String MR_ENCRYPTED_INTERMEDIATE_DATA =
       "mapreduce.job.encrypted-intermediate-data";
       "mapreduce.job.encrypted-intermediate-data";
   public static final boolean DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA = false;
   public static final boolean DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA = false;

+ 30 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -208,9 +208,11 @@
 
 
 <property>
 <property>
   <name>mapreduce.map.memory.mb</name>
   <name>mapreduce.map.memory.mb</name>
-  <value>1024</value>
+  <value>-1</value>
   <description>The amount of memory to request from the scheduler for each
   <description>The amount of memory to request from the scheduler for each
-  map task.
+    map task. If this is not specified or is non-positive, it is inferred from
+    mapreduce.map.java.opts and mapreduce.job.heap.memory-mb.ratio.
+    If java-opts are also not specified, we set it to 1024.
   </description>
   </description>
 </property>
 </property>
 
 
@@ -224,9 +226,11 @@
 
 
 <property>
 <property>
   <name>mapreduce.reduce.memory.mb</name>
   <name>mapreduce.reduce.memory.mb</name>
-  <value>1024</value>
+  <value>-1</value>
   <description>The amount of memory to request from the scheduler for each
   <description>The amount of memory to request from the scheduler for each
-  reduce task.
+    reduce task. If this is not specified or is non-positive, it is inferred
+    from mapreduce.reduce.java.opts and mapreduce.job.heap.memory-mb.ratio.
+    If java-opts are also not specified, we set it to 1024.
   </description>
   </description>
 </property>
 </property>
 
 
@@ -240,7 +244,7 @@
 
 
 <property>
 <property>
   <name>mapred.child.java.opts</name>
   <name>mapred.child.java.opts</name>
-  <value>-Xmx200m</value>
+  <value></value>
   <description>Java opts for the task processes.
   <description>Java opts for the task processes.
   The following symbol, if present, will be interpolated: @taskid@ is replaced 
   The following symbol, if present, will be interpolated: @taskid@ is replaced 
   by current TaskID. Any other occurrences of '@' will go unchanged.
   by current TaskID. Any other occurrences of '@' will go unchanged.
@@ -251,7 +255,10 @@
   Usage of -Djava.library.path can cause programs to no longer function if
   Usage of -Djava.library.path can cause programs to no longer function if
   hadoop native libraries are used. These values should instead be set as part 
   hadoop native libraries are used. These values should instead be set as part 
   of LD_LIBRARY_PATH in the map / reduce JVM env using the mapreduce.map.env and 
   of LD_LIBRARY_PATH in the map / reduce JVM env using the mapreduce.map.env and 
-  mapreduce.reduce.env config settings. 
+  mapreduce.reduce.env config settings.
+
+  If -Xmx is not set, it is inferred from mapreduce.{map|reduce}.memory.mb and
+  mapreduce.job.heap.memory-mb.ratio.
   </description>
   </description>
 </property>
 </property>
 
 
@@ -260,7 +267,9 @@
   <name>mapreduce.map.java.opts</name>
   <name>mapreduce.map.java.opts</name>
   <value></value>
   <value></value>
   <description>Java opts only for the child processes that are maps. If set,
   <description>Java opts only for the child processes that are maps. If set,
-  this will be used instead of mapred.child.java.opts.
+  this will be used instead of mapred.child.java.opts. If -Xmx is not set,
+  it is inferred from mapreduce.map.memory.mb and
+  mapreduce.job.heap.memory-mb.ratio.
   </description>
   </description>
 </property>
 </property>
 -->
 -->
@@ -270,7 +279,9 @@
   <name>mapreduce.reduce.java.opts</name>
   <name>mapreduce.reduce.java.opts</name>
   <value></value>
   <value></value>
   <description>Java opts only for the child processes that are reduces. If set,
   <description>Java opts only for the child processes that are reduces. If set,
-  this will be used instead of mapred.child.java.opts.
+  this will be used instead of mapred.child.java.opts. If -Xmx is not set,
+  it is inferred from mapreduce.reduce.memory.mb and
+  mapreduce.job.heap.memory-mb.ratio.
   </description>
   </description>
 </property>
 </property>
 -->
 -->
@@ -1567,4 +1578,15 @@
     - HTTPS_ONLY : Service is provided only on https
     - HTTPS_ONLY : Service is provided only on https
   </description>
   </description>
 </property>
 </property>
+
+<property>
+  <name>mapreduce.job.heap.memory-mb.ratio</name>
+  <value>0.8</value>
+  <description>The ratio of heap-size to container-size. If no -Xmx is
+    specified, it is calculated as
+    (mapreduce.{map|reduce}.memory.mb * mapreduce.heap.memory-mb.ratio).
+    If -Xmx is specified but not mapreduce.{map|reduce}.memory.mb, it is
+    calculated as (heapSize / mapreduce.heap.memory-mb.ratio).
+  </description>
+</property>
 </configuration>
 </configuration>