浏览代码

MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge. Contributed by Gera Shegalov

Jason Lowe 10 年之前
父节点
当前提交
7dc3c1203d

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -365,6 +365,9 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-6349. Fix typo in property org.apache.hadoop.mapreduce.
     MAPREDUCE-6349. Fix typo in property org.apache.hadoop.mapreduce.
     lib.chain.Chain.REDUCER_INPUT_VALUE_CLASS. (Ray Chiang via ozawa)
     lib.chain.Chain.REDUCER_INPUT_VALUE_CLASS. (Ray Chiang via ozawa)
 
 
+    MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge
+    (Gera Shegalov via jlowe)
+
 Release 2.7.1 - UNRELEASED
 Release 2.7.1 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 25 - 22
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java

@@ -93,8 +93,10 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
   
   
   Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>();
   Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>();
   private final OnDiskMerger onDiskMerger;
   private final OnDiskMerger onDiskMerger;
-  
-  private final long memoryLimit;
+
+  @VisibleForTesting
+  final long memoryLimit;
+
   private long usedMemory;
   private long usedMemory;
   private long commitMemory;
   private long commitMemory;
   private final long maxSingleShuffleLimit;
   private final long maxSingleShuffleLimit;
@@ -167,11 +169,10 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
     }
     }
 
 
     // Allow unit tests to fix Runtime memory
     // Allow unit tests to fix Runtime memory
-    this.memoryLimit = 
-      (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
-          Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
-        * maxInMemCopyUse);
- 
+    this.memoryLimit = (long)(jobConf.getLong(
+        MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
+        Runtime.getRuntime().maxMemory()) * maxInMemCopyUse);
+
     this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
     this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
 
 
     final float singleShuffleMemoryLimitPercent =
     final float singleShuffleMemoryLimitPercent =
@@ -202,7 +203,7 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
 
 
     if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
     if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
       throw new RuntimeException("Invalid configuration: "
       throw new RuntimeException("Invalid configuration: "
-          + "maxSingleShuffleLimit should be less than mergeThreshold"
+          + "maxSingleShuffleLimit should be less than mergeThreshold "
           + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
           + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
           + "mergeThreshold: " + this.mergeThreshold);
           + "mergeThreshold: " + this.mergeThreshold);
     }
     }
@@ -668,24 +669,26 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
     }
     }
   }
   }
 
 
-  private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
-                                       List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
-                                       List<CompressAwarePath> onDiskMapOutputs
-                                       ) throws IOException {
-    LOG.info("finalMerge called with " + 
-             inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
-             onDiskMapOutputs.size() + " on-disk map-outputs");
-    
+  @VisibleForTesting
+  final long getMaxInMemReduceLimit() {
     final float maxRedPer =
     final float maxRedPer =
-      job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
+        jobConf.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
     if (maxRedPer > 1.0 || maxRedPer < 0.0) {
     if (maxRedPer > 1.0 || maxRedPer < 0.0) {
-      throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT +
-                            maxRedPer);
+      throw new RuntimeException(maxRedPer + ": "
+          + MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT
+          + " must be a float between 0 and 1.0");
     }
     }
-    int maxInMemReduce = (int)Math.min(
-        Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
-    
+    return (long)(memoryLimit * maxRedPer);
+  }
 
 
+  private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
+                                       List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
+                                       List<CompressAwarePath> onDiskMapOutputs
+                                       ) throws IOException {
+    LOG.info("finalMerge called with " +
+        inMemoryMapOutputs.size() + " in-memory map-outputs and " +
+        onDiskMapOutputs.size() + " on-disk map-outputs");
+    final long maxInMemReduce = getMaxInMemReduceLimit();
     // merge config params
     // merge config params
     Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
     Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
     Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
     Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();

+ 29 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java

@@ -260,4 +260,33 @@ public class TestMergeManager {
     }
     }
 
 
   }
   }
+
+  @Test
+  public void testLargeMemoryLimits() throws Exception {
+    final JobConf conf = new JobConf();
+    // Xmx in production
+    conf.setLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
+        8L * 1024 * 1024 * 1024);
+
+    // M1 = Xmx fraction for map outputs
+    conf.setFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 1.0f);
+
+    // M2 = max M1 fraction for a single maple output
+    conf.setFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, 0.95f);
+
+    // M3 = M1 fraction at which in memory merge is triggered
+    conf.setFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 1.0f);
+
+    // M4 = M1 fraction of map outputs remaining in memory for a reduce
+    conf.setFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 1.0f);
+
+    final MergeManagerImpl<Text, Text> mgr = new MergeManagerImpl<Text, Text>(
+        null, conf, mock(LocalFileSystem.class), null, null, null, null, null,
+        null, null, null, null, null, new MROutputFiles());
+    assertTrue("Large shuffle area unusable: " + mgr.memoryLimit,
+        mgr.memoryLimit > Integer.MAX_VALUE);
+    final long maxInMemReduce = mgr.getMaxInMemReduceLimit();
+    assertTrue("Large in-memory reduce area unusable: " + maxInMemReduce,
+        maxInMemReduce > Integer.MAX_VALUE);
+  }
 }
 }