浏览代码

MAPREDUCE-5053. java.lang.InternalError from decompression codec cause reducer to fail (Robert Parker via jeagles)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1458350 13f79535-47bb-0310-9956-ffa450edef68
Jonathan Turner Eagles 12 年之前
父节点
当前提交
523d3daac0

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -824,6 +824,9 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-5042. Reducer unable to fetch for a map task that was recovered
     (Jason Lowe via bobby)
 
+    MAPREDUCE-5053. java.lang.InternalError from decompression codec cause
+    reducer to fail (Robert Parker via jeagles)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 14 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

@@ -357,13 +357,20 @@ class Fetcher<K,V> extends Thread {
         return EMPTY_ATTEMPT_ID_ARRAY;
       } 
       
-      // Go!
-      LOG.info("fetcher#" + id + " about to shuffle output of map " + 
-               mapOutput.getMapId() + " decomp: " +
-               decompressedLength + " len: " + compressedLength + " to " +
-               mapOutput.getDescription());
-      mapOutput.shuffle(host, input, compressedLength, decompressedLength,
-                        metrics, reporter);
+      // The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
+      // on decompression failures. Catching and re-throwing as IOException
+      // to allow fetch failure logic to be processed
+      try {
+        // Go!
+        LOG.info("fetcher#" + id + " about to shuffle output of map "
+            + mapOutput.getMapId() + " decomp: " + decompressedLength
+            + " len: " + compressedLength + " to " + mapOutput.getDescription());
+        mapOutput.shuffle(host, input, compressedLength, decompressedLength,
+            metrics, reporter);
+      } catch (java.lang.InternalError e) {
+        LOG.warn("Failed to shuffle for fetcher#"+id, e);
+        throw new IOException(e);
+      }
       
       // Inform the shuffle scheduler
       long endTime = System.currentTimeMillis();

+ 60 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java

@@ -25,6 +25,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.SocketTimeoutException;
 import java.net.URL;
@@ -233,4 +234,62 @@ public class TestFetcher {
     verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
   }
   
-}
+  @SuppressWarnings("unchecked")
+  @Test(timeout=10000) 
+  public void testCopyFromHostCompressFailure() throws Exception {
+    LOG.info("testCopyFromHostCompressFailure");
+    JobConf job = new JobConf();
+    TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
+    ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+    MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
+    InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
+    Reporter r = mock(Reporter.class);
+    ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+    ExceptionReporter except = mock(ExceptionReporter.class);
+    SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
+    HttpURLConnection connection = mock(HttpURLConnection.class);
+    
+    Counters.Counter allErrs = mock(Counters.Counter.class);
+    when(r.getCounter(anyString(), anyString()))
+      .thenReturn(allErrs);
+    
+    Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
+        r, metrics, except, key, connection);
+    
+
+    MapHost host = new MapHost("localhost", "http://localhost:8080/");
+    
+    ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
+    TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
+    maps.add(map1ID);
+    TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
+    maps.add(map2ID);
+    when(ss.getMapsForHost(host)).thenReturn(maps);
+    
+    String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
+    String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+    
+    when(connection.getResponseCode()).thenReturn(200);
+    when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+      .thenReturn(replyHash);
+    ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    header.write(new DataOutputStream(bout));
+    ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+    when(connection.getInputStream()).thenReturn(in);
+    when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
+      .thenReturn(immo);
+    
+    doThrow(new java.lang.InternalError())
+    .when(immo)
+      .shuffle(any(MapHost.class), any(InputStream.class), anyLong(), 
+               anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
+
+    underTest.copyFromHost(host);
+       
+    verify(connection)
+      .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, 
+          encHash);
+    verify(ss, times(1)).copyFailed(map1ID, host, true, false);
+  }
+}