浏览代码

MAPREDUCE-5785. Derive heap size or mapreduce.*.memory.mb automatically. (Gera Shegalov and Karthik Kambatla via kasha)

Karthik Kambatla 10 年之前
父节点
当前提交
a4df9eed05

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -235,6 +235,9 @@ Release 2.7.0 - UNRELEASED
 
 
   IMPROVEMENTS
   IMPROVEMENTS
 
 
+    MAPREDUCE-5785. Derive heap size or mapreduce.*.memory.mb automatically. 
+    (Gera Shegalov and Karthik Kambatla via kasha)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     MAPREDUCE-6169. MergeQueue should release reference to the current item 
     MAPREDUCE-6169. MergeQueue should release reference to the current item 

+ 2 - 30
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapred.TaskLog.LogName;
 import org.apache.hadoop.mapreduce.ID;
 import org.apache.hadoop.mapreduce.ID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -115,36 +116,7 @@ public class MapReduceChildJVM {
   }
   }
 
 
   private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {
   private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {
-    String userClasspath = "";
-    String adminClasspath = "";
-    if (isMapTask) {
-      userClasspath = 
-          jobConf.get(
-              JobConf.MAPRED_MAP_TASK_JAVA_OPTS, 
-              jobConf.get(
-                  JobConf.MAPRED_TASK_JAVA_OPTS, 
-                  JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)
-          );
-      adminClasspath = 
-          jobConf.get(
-              MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
-              MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
-    } else {
-      userClasspath =
-          jobConf.get(
-              JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, 
-              jobConf.get(
-                  JobConf.MAPRED_TASK_JAVA_OPTS,
-                  JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)
-              );
-      adminClasspath =
-          jobConf.get(
-              MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
-              MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
-    }
-    
-    // Add admin classpath first so it can be overridden by user.
-    return adminClasspath + " " + userClasspath;
+    return jobConf.getTaskJavaOpts(isMapTask ? TaskType.MAP : TaskType.REDUCE);
   }
   }
 
 
   private static void setupLog4jProperties(Task task,
   private static void setupLog4jProperties(Task task,

+ 8 - 12
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -564,18 +564,14 @@ public abstract class TaskAttemptImpl implements
   }
   }
 
 
   private int getMemoryRequired(Configuration conf, TaskType taskType) {
   private int getMemoryRequired(Configuration conf, TaskType taskType) {
-    int memory = 1024;
-    if (taskType == TaskType.MAP)  {
-      memory =
-          conf.getInt(MRJobConfig.MAP_MEMORY_MB,
-              MRJobConfig.DEFAULT_MAP_MEMORY_MB);
-    } else if (taskType == TaskType.REDUCE) {
-      memory =
-          conf.getInt(MRJobConfig.REDUCE_MEMORY_MB,
-              MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
-    }
-    
-    return memory;
+    JobConf jobConf = conf instanceof JobConf
+        ? (JobConf) conf
+        : new JobConf(conf);
+
+    return jobConf.getMemoryRequired(
+        taskType == TaskType.MAP
+            ? org.apache.hadoop.mapreduce.TaskType.MAP
+            : org.apache.hadoop.mapreduce.TaskType.REDUCE);
   }
   }
 
 
   private int getCpuRequired(Configuration conf, TaskType taskType) {
   private int getCpuRequired(Configuration conf, TaskType taskType) {

+ 114 - 37
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java

@@ -20,8 +20,11 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 
 import java.util.Map;
 import java.util.Map;
 
 
+import org.apache.hadoop.mapreduce.TaskType;
 import org.junit.Assert;
 import org.junit.Assert;
 
 
+import java.util.ArrayList;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -43,6 +46,20 @@ public class TestMapReduceChildJVM {
 
 
   private static final Log LOG = LogFactory.getLog(TestMapReduceChildJVM.class);
   private static final Log LOG = LogFactory.getLog(TestMapReduceChildJVM.class);
 
 
+  private final String[] expectedContents = {
+      "[", MRApps.crossPlatformify("JAVA_HOME") + "/bin/java",
+      "-Djava.net.preferIPv4Stack=true",
+      "-Dhadoop.metrics.log.level=WARN",
+      "-Xmx820m",
+      "-Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp",
+      "-Dyarn.app.container.log.dir=<LOG_DIR>",
+      "-Dyarn.app.container.log.filesize=0",
+      "-Dhadoop.root.logger=INFO,CLA",
+      "org.apache.hadoop.mapred.YarnChild",
+      "127.0.0.1", "54321", "attempt_0_0000_m_000000_0",
+      "0", "1><LOG_DIR>/stdout",
+      "2><LOG_DIR>/stderr ]"};
+
   @Test (timeout = 30000)
   @Test (timeout = 30000)
   public void testCommandLine() throws Exception {
   public void testCommandLine() throws Exception {
 
 
@@ -53,22 +70,16 @@ public class TestMapReduceChildJVM {
     app.waitForState(job, JobState.SUCCEEDED);
     app.waitForState(job, JobState.SUCCEEDED);
     app.verifyCompleted();
     app.verifyCompleted();
 
 
-    Assert.assertEquals(
-      "[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" +
-      " -Djava.net.preferIPv4Stack=true" +
-      " -Dhadoop.metrics.log.level=WARN" +
-      "  -Xmx200m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" +
-      " -Dlog4j.configuration=container-log4j.properties" +
-      " -Dyarn.app.container.log.dir=<LOG_DIR>" +
-      " -Dyarn.app.container.log.filesize=0" +
-      " -Dhadoop.root.logger=INFO,CLA" +
-      " org.apache.hadoop.mapred.YarnChild 127.0.0.1" +
-      " 54321" +
-      " attempt_0_0000_m_000000_0" +
-      " 0" +
-      " 1><LOG_DIR>/stdout" +
-      " 2><LOG_DIR>/stderr ]", app.myCommandLine);
-    
+    for (String content : expectedContents) {
+      Assert.assertTrue("Missing argument",
+          app.launchCmdList.get(0).contains(content));
+    }
+
+    // Check log4j
+    Assert.assertTrue("Missing argument",
+        app.launchCmdList.get(0).contains(
+            "-Dlog4j.configuration=container-log4j.properties"));
+
     Assert.assertTrue("HADOOP_ROOT_LOGGER not set for job",
     Assert.assertTrue("HADOOP_ROOT_LOGGER not set for job",
       app.cmdEnvironment.containsKey("HADOOP_ROOT_LOGGER"));
       app.cmdEnvironment.containsKey("HADOOP_ROOT_LOGGER"));
     Assert.assertEquals("INFO,console",
     Assert.assertEquals("INFO,console",
@@ -84,33 +95,99 @@ public class TestMapReduceChildJVM {
     MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true);
     MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true);
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
     conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
     conf.setBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, true);
-    String testLogPropertieFile = "test-log4j.properties";
-    String testLogPropertiePath = "../"+"test-log4j.properties";
-    conf.set(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, testLogPropertiePath);
+    String testLogPropertyFile = "test-log4j.properties";
+    String testLogPropertyPath = "../"+"test-log4j.properties";
+    conf.set(MRJobConfig.MAPREDUCE_JOB_LOG4J_PROPERTIES_FILE, testLogPropertyPath);
     Job job = app.submit(conf);
     Job job = app.submit(conf);
     app.waitForState(job, JobState.SUCCEEDED);
     app.waitForState(job, JobState.SUCCEEDED);
     app.verifyCompleted();
     app.verifyCompleted();
 
 
-    Assert.assertEquals(
-      "[" + MRApps.crossPlatformify("JAVA_HOME") + "/bin/java" +
-      " -Djava.net.preferIPv4Stack=true" +
-      " -Dhadoop.metrics.log.level=WARN" +
-      "  -Xmx200m -Djava.io.tmpdir=" + MRApps.crossPlatformify("PWD") + "/tmp" +
-      " -Dlog4j.configuration=" + testLogPropertieFile +
-      " -Dyarn.app.container.log.dir=<LOG_DIR>" +
-      " -Dyarn.app.container.log.filesize=0" +
-      " -Dhadoop.root.logger=INFO,CLA" +
-      " org.apache.hadoop.mapred.YarnChild 127.0.0.1" +
-      " 54321" +
-      " attempt_0_0000_m_000000_0" +
-      " 0" +
-      " 1><LOG_DIR>/stdout" +
-      " 2><LOG_DIR>/stderr ]", app.myCommandLine);
+    for (String content : expectedContents) {
+      Assert.assertTrue("Missing argument",
+          app.launchCmdList.get(0).contains(content));
+    }
+
+    // Check log4j
+    Assert.assertTrue("Missing argument",
+        app.launchCmdList.get(0).contains(
+            "-Dlog4j.configuration=" + testLogPropertyFile));
   }
   }
 
 
-  private static final class MyMRApp extends MRApp {
+  @Test
+  public void testAutoHeapSizes() throws Exception {
+    // Don't specify heap size or memory-mb
+    testAutoHeapSize(-1, -1, null);
+
+    // Don't specify heap size
+    testAutoHeapSize(512, 768, null);
+    testAutoHeapSize(100, 768, null);
+    testAutoHeapSize(512, 100, null);
+
+    // Specify heap size
+    testAutoHeapSize(512, 768, "-Xmx100m");
+    testAutoHeapSize(512, 768, "-Xmx500m");
+
+    // Specify heap size but not the memory
+    testAutoHeapSize(-1, -1, "-Xmx100m");
+    testAutoHeapSize(-1, -1, "-Xmx500m");
+  }
+
+  private void testAutoHeapSize(int mapMb, int redMb, String xmxArg)
+      throws Exception {
+    JobConf conf = new JobConf(new Configuration());
+    float heapRatio = conf.getFloat(MRJobConfig.HEAP_MEMORY_MB_RATIO,
+        MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO);
+
+    // Verify map and reduce java opts are not set by default
+    Assert.assertNull("Default map java opts!",
+        conf.get(MRJobConfig.MAP_JAVA_OPTS));
+    Assert.assertNull("Default reduce java opts!",
+        conf.get(MRJobConfig.REDUCE_JAVA_OPTS));
+
+    // Set the memory-mbs and java-opts
+    if (mapMb > 0) {
+      conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMb);
+    } else {
+      mapMb = conf.getMemoryRequired(TaskType.MAP);
+    }
 
 
-    private String myCommandLine;
+    if (redMb > 0) {
+      conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, redMb);
+    } else {
+      redMb = conf.getMemoryRequired(TaskType.REDUCE);
+    }
+
+    if (xmxArg != null) {
+      conf.set(MRJobConfig.MAP_JAVA_OPTS, xmxArg);
+      conf.set(MRJobConfig.REDUCE_JAVA_OPTS, xmxArg);
+    }
+
+    // Submit job to let unspecified fields be picked up
+    MyMRApp app = new MyMRApp(1, 1, true, this.getClass().getName(), true);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+
+    // Go through the tasks and verify the values are as expected
+    for (String cmd : app.launchCmdList) {
+      final boolean isMap = cmd.contains("_m_");
+      int heapMb;
+      if (xmxArg == null) {
+        heapMb = (int)(Math.ceil((isMap ? mapMb : redMb) * heapRatio));
+      } else {
+        final String javaOpts = conf.get(isMap
+            ? MRJobConfig.MAP_JAVA_OPTS
+            : MRJobConfig.REDUCE_JAVA_OPTS);
+        heapMb = JobConf.parseMaximumHeapSizeMB(javaOpts);
+      }
+
+      Assert.assertEquals("Incorrect heapsize in the command opts",
+          heapMb, JobConf.parseMaximumHeapSizeMB(cmd));
+    }
+  }
+
+  private static final class MyMRApp extends MRApp {
+    final ArrayList<String> launchCmdList = new ArrayList<String>();
     private Map<String, String> cmdEnvironment;
     private Map<String, String> cmdEnvironment;
 
 
     public MyMRApp(int maps, int reduces, boolean autoComplete,
     public MyMRApp(int maps, int reduces, boolean autoComplete,
@@ -129,7 +206,7 @@ public class TestMapReduceChildJVM {
                 launchEvent.getContainerLaunchContext();
                 launchEvent.getContainerLaunchContext();
             String cmdString = launchContext.getCommands().toString();
             String cmdString = launchContext.getCommands().toString();
             LOG.info("launchContext " + cmdString);
             LOG.info("launchContext " + cmdString);
-            myCommandLine = cmdString;
+            launchCmdList.add(cmdString);
             cmdEnvironment = launchContext.getEnvironment();
             cmdEnvironment = launchContext.getEnvironment();
           }
           }
           super.handle(event);
           super.handle(event);

+ 121 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java

@@ -20,8 +20,10 @@ package org.apache.hadoop.mapred;
 
 
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -44,6 +46,7 @@ import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
@@ -114,6 +117,8 @@ import org.apache.log4j.Level;
 public class JobConf extends Configuration {
 public class JobConf extends Configuration {
 
 
   private static final Log LOG = LogFactory.getLog(JobConf.class);
   private static final Log LOG = LogFactory.getLog(JobConf.class);
+  private static final Pattern JAVA_OPTS_XMX_PATTERN =
+          Pattern.compile(".*(?:^|\\s)-Xmx(\\d+)([gGmMkK]?)(?:$|\\s).*");
 
 
   static{
   static{
     ConfigUtil.loadResources();
     ConfigUtil.loadResources();
@@ -247,9 +252,7 @@ public class JobConf extends Configuration {
    */
    */
   public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
   public static final String MAPRED_REDUCE_TASK_JAVA_OPTS = 
     JobContext.REDUCE_JAVA_OPTS;
     JobContext.REDUCE_JAVA_OPTS;
-  
-  public static final String DEFAULT_MAPRED_TASK_JAVA_OPTS = "-Xmx200m";
-  
+
   /**
   /**
    * @deprecated
    * @deprecated
    * Configuration key to set the maximum virtual memory available to the child
    * Configuration key to set the maximum virtual memory available to the child
@@ -2022,7 +2025,121 @@ public class JobConf extends Configuration {
       LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
       LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_REDUCE_TASK_ULIMIT));
     }
     }
   }
   }
-  
+
+  private String getConfiguredTaskJavaOpts(TaskType taskType) {
+    String userClasspath = "";
+    String adminClasspath = "";
+    if (taskType == TaskType.MAP) {
+      userClasspath = get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS,
+          get(JobConf.MAPRED_TASK_JAVA_OPTS));
+      adminClasspath = get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
+          MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
+    } else {
+      userClasspath = get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS,
+          get(JobConf.MAPRED_TASK_JAVA_OPTS));
+      adminClasspath = get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
+          MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
+    }
+
+    // Add admin classpath first so it can be overridden by user.
+    return adminClasspath + " " + userClasspath;
+  }
+
+  @Private
+  public String getTaskJavaOpts(TaskType taskType) {
+    String javaOpts = getConfiguredTaskJavaOpts(taskType);
+
+    if (!javaOpts.contains("-Xmx")) {
+      float heapRatio = getFloat(MRJobConfig.HEAP_MEMORY_MB_RATIO,
+          MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO);
+
+      if (heapRatio > 1.0f || heapRatio < 0) {
+        LOG.warn("Invalid value for " + MRJobConfig.HEAP_MEMORY_MB_RATIO
+            + ", using the default.");
+        heapRatio = MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO;
+      }
+
+      int taskContainerMb = getMemoryRequired(taskType);
+      int taskHeapSize = (int)Math.ceil(taskContainerMb * heapRatio);
+
+      String xmxArg = String.format("-Xmx%dm", taskHeapSize);
+      LOG.info("Task java-opts do not specify heap size. Setting task attempt" +
+          " jvm max heap size to " + xmxArg);
+
+      javaOpts += " " + xmxArg;
+    }
+
+    return javaOpts;
+  }
+
+  /**
+   * Parse the Maximum heap size from the java opts as specified by the -Xmx option
+   * Format: -Xmx<size>[g|G|m|M|k|K]
+   * @param javaOpts String to parse to read maximum heap size
+   * @return Maximum heap size in MB or -1 if not specified
+   */
+  @Private
+  @VisibleForTesting
+  public static int parseMaximumHeapSizeMB(String javaOpts) {
+    // Find the last matching -Xmx following word boundaries
+    Matcher m = JAVA_OPTS_XMX_PATTERN.matcher(javaOpts);
+    if (m.matches()) {
+      int size = Integer.parseInt(m.group(1));
+      if (size <= 0) {
+        return -1;
+      }
+      if (m.group(2).isEmpty()) {
+        // -Xmx specified in bytes
+        return size / (1024 * 1024);
+      }
+      char unit = m.group(2).charAt(0);
+      switch (unit) {
+        case 'g':
+        case 'G':
+          // -Xmx specified in GB
+          return size * 1024;
+        case 'm':
+        case 'M':
+          // -Xmx specified in MB
+          return size;
+        case 'k':
+        case 'K':
+          // -Xmx specified in KB
+          return size / 1024;
+      }
+    }
+    // -Xmx not specified
+    return -1;
+  }
+
+  @Private
+  public int getMemoryRequired(TaskType taskType) {
+    int memory = 1024;
+    int heapSize = parseMaximumHeapSizeMB(getConfiguredTaskJavaOpts(taskType));
+    float heapRatio = getFloat(MRJobConfig.HEAP_MEMORY_MB_RATIO,
+        MRJobConfig.DEFAULT_HEAP_MEMORY_MB_RATIO);
+    if (taskType == TaskType.MAP)  {
+      if (get(MRJobConfig.MAP_MEMORY_MB) == null && heapSize > 0) {
+        memory = (int) Math.ceil(heapSize / heapRatio);
+        LOG.info(MRJobConfig.MAP_MEMORY_MB +
+            " not specified. Derived from javaOpts = " + memory);
+      } else {
+        memory = getInt(MRJobConfig.MAP_MEMORY_MB,
+            MRJobConfig.DEFAULT_MAP_MEMORY_MB);
+      }
+    } else if (taskType == TaskType.REDUCE) {
+      if (get(MRJobConfig.REDUCE_MEMORY_MB) == null && heapSize > 0) {
+        memory = (int) Math.ceil(heapSize / heapRatio);
+        LOG.info(MRJobConfig.REDUCE_MEMORY_MB +
+            " not specified. Derived from javaOpts = " + memory);
+      } else {
+        memory = getInt(MRJobConfig.REDUCE_MEMORY_MB,
+            MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
+      }
+    }
+
+    return memory;
+  }
 
 
 }
 }
 
 

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java

@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 
 import javax.crypto.SecretKey;
 import javax.crypto.SecretKey;
 
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -624,8 +625,9 @@ abstract public class Task implements Writable, Configurable {
      * Using AtomicBoolean since we need an atomic read & reset method. 
      * Using AtomicBoolean since we need an atomic read & reset method. 
      */  
      */  
     private AtomicBoolean progressFlag = new AtomicBoolean(false);
     private AtomicBoolean progressFlag = new AtomicBoolean(false);
-    
-    TaskReporter(Progress taskProgress,
+
+    @VisibleForTesting
+    public TaskReporter(Progress taskProgress,
                  TaskUmbilicalProtocol umbilical) {
                  TaskUmbilicalProtocol umbilical) {
       this.umbilical = umbilical;
       this.umbilical = umbilical;
       this.taskProgress = taskProgress;
       this.taskProgress = taskProgress;

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -799,6 +799,11 @@ public interface MRJobConfig {
   public static final String TASK_PREEMPTION =
   public static final String TASK_PREEMPTION =
       "mapreduce.job.preemption";
       "mapreduce.job.preemption";
 
 
+  public static final String HEAP_MEMORY_MB_RATIO =
+      "mapreduce.job.heap.memory-mb.ratio";
+
+  public static final float DEFAULT_HEAP_MEMORY_MB_RATIO = 0.8f;
+
   public static final String MR_ENCRYPTED_INTERMEDIATE_DATA =
   public static final String MR_ENCRYPTED_INTERMEDIATE_DATA =
       "mapreduce.job.encrypted-intermediate-data";
       "mapreduce.job.encrypted-intermediate-data";
   public static final boolean DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA = false;
   public static final boolean DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA = false;

+ 30 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -208,9 +208,11 @@
 
 
 <property>
 <property>
   <name>mapreduce.map.memory.mb</name>
   <name>mapreduce.map.memory.mb</name>
-  <value>1024</value>
+  <!--value>1024</value-->
   <description>The amount of memory to request from the scheduler for each
   <description>The amount of memory to request from the scheduler for each
-  map task.
+    map task. If this is not specified, it is inferred from
+    mapreduce.map.java.opts and mapreduce.job.heap.memory-mb.ratio.
+    If java-opts are also not specified, we set it to 1024.
   </description>
   </description>
 </property>
 </property>
 
 
@@ -224,9 +226,11 @@
 
 
 <property>
 <property>
   <name>mapreduce.reduce.memory.mb</name>
   <name>mapreduce.reduce.memory.mb</name>
-  <value>1024</value>
+  <!--value>1024</value-->
   <description>The amount of memory to request from the scheduler for each
   <description>The amount of memory to request from the scheduler for each
-  reduce task.
+    reduce task. If this is not specified, it is inferred from
+    mapreduce.reduce.java.opts and mapreduce.job.heap.memory-mb.ratio.
+    If java-opts are also not specified, we set it to 1024.
   </description>
   </description>
 </property>
 </property>
 
 
@@ -240,7 +244,7 @@
 
 
 <property>
 <property>
   <name>mapred.child.java.opts</name>
   <name>mapred.child.java.opts</name>
-  <value>-Xmx200m</value>
+  <!--value></value-->
   <description>Java opts for the task processes.
   <description>Java opts for the task processes.
   The following symbol, if present, will be interpolated: @taskid@ is replaced 
   The following symbol, if present, will be interpolated: @taskid@ is replaced 
   by current TaskID. Any other occurrences of '@' will go unchanged.
   by current TaskID. Any other occurrences of '@' will go unchanged.
@@ -251,7 +255,10 @@
   Usage of -Djava.library.path can cause programs to no longer function if
   Usage of -Djava.library.path can cause programs to no longer function if
   hadoop native libraries are used. These values should instead be set as part 
   hadoop native libraries are used. These values should instead be set as part 
   of LD_LIBRARY_PATH in the map / reduce JVM env using the mapreduce.map.env and 
   of LD_LIBRARY_PATH in the map / reduce JVM env using the mapreduce.map.env and 
-  mapreduce.reduce.env config settings. 
+  mapreduce.reduce.env config settings.
+
+  If -Xmx is not set, it is inferred from mapreduce.{map|reduce}.memory.mb and
+  mapreduce.job.heap.memory-mb.ratio.
   </description>
   </description>
 </property>
 </property>
 
 
@@ -260,7 +267,9 @@
   <name>mapreduce.map.java.opts</name>
   <name>mapreduce.map.java.opts</name>
   <value></value>
   <value></value>
   <description>Java opts only for the child processes that are maps. If set,
   <description>Java opts only for the child processes that are maps. If set,
-  this will be used instead of mapred.child.java.opts.
+  this will be used instead of mapred.child.java.opts. If -Xmx is not set,
+  it is inferred from mapreduce.map.memory.mb and
+  mapreduce.job.heap.memory-mb.ratio.
   </description>
   </description>
 </property>
 </property>
 -->
 -->
@@ -270,7 +279,9 @@
   <name>mapreduce.reduce.java.opts</name>
   <name>mapreduce.reduce.java.opts</name>
   <value></value>
   <value></value>
   <description>Java opts only for the child processes that are reduces. If set,
   <description>Java opts only for the child processes that are reduces. If set,
-  this will be used instead of mapred.child.java.opts.
+  this will be used instead of mapred.child.java.opts. If -Xmx is not set,
+  it is inferred from mapreduce.reduce.memory.mb and
+  mapreduce.job.heap.memory-mb.ratio.
   </description>
   </description>
 </property>
 </property>
 -->
 -->
@@ -1523,4 +1534,15 @@
     - HTTPS_ONLY : Service is provided only on https
     - HTTPS_ONLY : Service is provided only on https
   </description>
   </description>
 </property>
 </property>
+
+<property>
+  <name>mapreduce.job.heap.memory-mb.ratio</name>
+  <value>0.8</value>
+  <description>The ratio of heap-size to container-size. If no -Xmx is
+    specified, it is calculated as
+    (mapreduce.{map|reduce}.memory.mb * mapreduce.heap.memory-mb.ratio).
+    If -Xmx is specified but not mapreduce.{map|reduce}.memory.mb, it is
+    calculated as (heapSize / mapreduce.heap.memory-mb.ratio).
+  </description>
+</property>
 </configuration>
 </configuration>