Pārlūkot izejas kodu

Merge -c 1523660 from trunk to branch-2 to fix MAPREDUCE-5493. Cleanup in-memory & on-disk segments to prevent leak on shuffle completion. Contributed by Jason Lowe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1523662 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 11 gadi atpakaļ
vecāks
revīzija
5ca8228d91

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -92,6 +92,9 @@ Release 2.1.1-beta - UNRELEASED
     MAPREDUCE-5164. mapred job and queue commands omit HADOOP_CLIENT_OPTS 
     (Nemon Lou via devaraj)
 
+    MAPREDUCE-5493. Cleanup in-memory & on-disk segments to prevent leak on
+    shuffle completion. (jlowe via acmurthy)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES

+ 3 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java

@@ -355,8 +355,11 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
     
     List<InMemoryMapOutput<K, V>> memory = 
       new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
+    inMemoryMergedMapOutputs.clear();
     memory.addAll(inMemoryMapOutputs);
+    inMemoryMapOutputs.clear();
     List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
+    onDiskMapOutputs.clear();
     return finalMerge(jobConf, rfs, memory, disk);
   }
    

+ 6 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java

@@ -82,7 +82,7 @@ public class TestMerger {
   }
   
   @Test
-  public void testInMemoryMerger() throws IOException {
+  public void testInMemoryMerger() throws Throwable {
     JobID jobId = new JobID("a", 0);
     TaskAttemptID reduceId = new TaskAttemptID(
         new TaskID(jobId, TaskType.REDUCE, 0), 0);
@@ -132,6 +132,11 @@ public class TestMerger {
     readOnDiskMapOutput(conf, fs, outPath, keys, values);
     Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
     Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
+
+    mergeManager.close();
+    Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
+    Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
+    Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size());
   }
   
   private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues)