Browse Source

MAPREDUCE-5543. In-memory map outputs can be leaked after shuffle completes in 0.23 (Jason Lowe via jeagles)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1526677 13f79535-47bb-0310-9956-ffa450edef68
Jonathan Turner Eagles 11 years ago
parent
commit
8d33e6a705

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -43,6 +43,9 @@ Release 0.23.10 - UNRELEASED
     MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta
     via tgraves)
 
+    MAPREDUCE-5543. In-memory map outputs can be leaked after shuffle completes
+    in 0.23 (Jason Lowe via jeagles)
+
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

+ 5 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java

@@ -356,8 +356,11 @@ public class MergeManager<K, V> {
     
     List<MapOutput<K, V>> memory = 
       new ArrayList<MapOutput<K, V>>(inMemoryMergedMapOutputs);
+    inMemoryMergedMapOutputs.clear();
     memory.addAll(inMemoryMapOutputs);
+    inMemoryMapOutputs.clear();
     List<Path> disk = getDiskMapOutputs();
+    onDiskMapOutputs.clear();
     return finalMerge(jobConf, rfs, memory, disk);
   }
 
@@ -671,7 +674,8 @@ public class MergeManager<K, V> {
     }
   }
 
-  private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
+  @VisibleForTesting
+  RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
                                        List<MapOutput<K,V>> inMemoryMapOutputs,
                                        List<Path> onDiskMapOutputs
                                        ) throws IOException {

+ 43 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java

@@ -39,7 +39,11 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MROutputFiles;
 import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
 import org.junit.Assert;
 import org.junit.Test;
@@ -119,6 +123,45 @@ public class TestMergeManager {
         0, reporter.getNumExceptions());
   }
 
+  @Test
+  public void testFinalMergeFreesMemory() throws Throwable {
+    JobConf conf = new JobConf();
+    TestExceptionReporter reporter = new TestExceptionReporter();
+    CyclicBarrier mergeStart = new CyclicBarrier(2);
+    CyclicBarrier mergeComplete = new CyclicBarrier(2);
+    StubbedMergeManager mgr = new StubbedMergeManager(conf, reporter,
+        mergeStart, mergeComplete) {
+
+          @Override
+          RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
+              List<MapOutput<Text, Text>> inMemoryMapOutputs,
+              List<Path> onDiskMapOutputs) throws IOException {
+            return null;
+          }
+    };
+
+    // reserve enough map output to cause a merge when it is committed
+    TaskAttemptID mapId1 = new TaskAttemptID(
+        new TaskID("job_1234", 0, TaskType.MAP, 0), 0);
+    MapOutput<Text, Text> out1 = mgr.reserve(mapId1, 1000, 0);
+    Assert.assertEquals("Should be a memory merge",
+        Type.MEMORY, out1.getType());
+    fillOutput(out1);
+    out1.commit();
+    TaskAttemptID mapId2 = new TaskAttemptID(
+        new TaskID("job_1234", 0, TaskType.MAP, 1), 0);
+    MapOutput<Text, Text> out2 = mgr.reserve(mapId2, 1000, 0);
+    Assert.assertEquals("Should be a memory merge",
+        Type.MEMORY, out2.getType());
+    fillOutput(out2);
+    out2.commit();
+
+    mgr.close();
+    Assert.assertEquals(0, mgr.inMemoryMapOutputs.size());
+    Assert.assertEquals(0, mgr.inMemoryMergedMapOutputs.size());
+    Assert.assertEquals(0, mgr.onDiskMapOutputs.size());
+  }
+
   private void fillOutput(MapOutput<Text, Text> output) throws IOException {
     BoundedByteArrayOutputStream stream = output.getArrayStream();
     int count = stream.getLimit();