Pārlūkot izejas kodu

MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon IOException during InMemoryMapOutput shuffle handler. Contributed by Eric Payne
(cherry picked from commit bc1bd7e5c4047b374420683d36a8c30eda6d75b6)
(cherry picked from commit 30d0f10458fbe0e3a85ea2e22a1bb3d4454fd896)

Jason Lowe 10 gadi atpakaļ
vecāks
revīzija
4104ab346f

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -16,6 +16,9 @@ Release 2.6.2 - UNRELEASED
     cache files so that child processes running hadoop scripts can access these
     files. (Junping Du via vinodkv)
 
+    MAPREDUCE-6334. Fetcher#copyMapOutput is leaking usedMemory upon
+    IOException during InMemoryMapOutput shuffle handler (Eric Payne via jlowe)
+
 Release 2.6.1 - 2015-09-23
 
   INCOMPATIBLE CHANGES

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

@@ -553,7 +553,10 @@ class Fetcher<K,V> extends Thread {
       metrics.successFetch();
       return null;
     } catch (IOException ioe) {
-      
+      if (mapOutput != null) {
+        mapOutput.abort();
+      }
+
       if (canRetry) {
         checkTimeoutOrRetry(host, ioe);
       } 
@@ -574,7 +577,6 @@ class Fetcher<K,V> extends Thread {
                " from " + host.getHostName(), ioe); 
 
       // Inform the shuffle-scheduler
-      mapOutput.abort();
       metrics.failedFetch();
       return new TaskAttemptID[] {mapId};
     }

+ 34 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java

@@ -628,6 +628,40 @@ public class TestFetcher {
     verify(odmo).abort();
   }
 
+  @SuppressWarnings("unchecked")
+  @Test(timeout=10000)
+  public void testCopyFromHostWithRetryUnreserve() throws Exception {
+    InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
+    Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry,
+        id, ss, mm, r, metrics, except, key, connection);
+
+    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+
+    when(connection.getResponseCode()).thenReturn(200);
+    when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+        .thenReturn(replyHash);
+    ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    header.write(new DataOutputStream(bout));
+    ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+    when(connection.getInputStream()).thenReturn(in);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
+        .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+
+    // Verify that unreserve occurs if an exception happens after shuffle
+    // buffer is reserved.
+    when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
+        .thenReturn(immo);
+    doThrow(new IOException("forced error")).when(immo).shuffle(
+        any(MapHost.class), any(InputStream.class), anyLong(),
+        anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
+
+    underTest.copyFromHost(host);
+    verify(immo).abort();
+  }
+
   public static class FakeFetcher<K,V> extends Fetcher<K,V> {
 
     // If connection need to be reopen.