ソースを参照

MAPREDUCE-5053. java.lang.InternalError from decompression codec cause reducer to fail (Robert Parker via jeagles)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1458368 13f79535-47bb-0310-9956-ffa450edef68
Jonathan Turner Eagles 12 年 前
コミット
7f35e21070

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -100,6 +100,9 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-5042. Reducer unable to fetch for a map task that was recovered
     MAPREDUCE-5042. Reducer unable to fetch for a map task that was recovered
     (Jason Lowe via bobby)
     (Jason Lowe via bobby)
 
 
+    MAPREDUCE-5053. java.lang.InternalError from decompression codec cause
+    reducer to fail (Robert Parker via jeagles)
+
 Release 0.23.6 - 2013-02-06
 Release 0.23.6 - 2013-02-06
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 16 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

@@ -347,18 +347,23 @@ class Fetcher<K,V> extends Thread {
         return EMPTY_ATTEMPT_ID_ARRAY;
         return EMPTY_ATTEMPT_ID_ARRAY;
       } 
       } 
       
       
-      // Go!
-      LOG.info("fetcher#" + id + " about to shuffle output of map " + 
-               mapOutput.getMapId() + " decomp: " +
-               decompressedLength + " len: " + compressedLength + " to " +
-               mapOutput.getType());
-      if (mapOutput.getType() == Type.MEMORY) {
-        shuffleToMemory(host, mapOutput, input, 
-                        (int) decompressedLength, (int) compressedLength);
-      } else {
-        shuffleToDisk(host, mapOutput, input, compressedLength);
+      // lz*, snappy, etc. throw java.lang.InternalError when there is a decompression
+      // error, catching and throwing IOException to trigger fetch failure logic
+      try {
+        // Go!
+        LOG.info("fetcher#" + id + " about to shuffle output of map "
+            + mapOutput.getMapId() + " decomp: " + decompressedLength
+            + " len: " + compressedLength + " to " + mapOutput.getType());
+        if (mapOutput.getType() == Type.MEMORY) {
+          shuffleToMemory(host, mapOutput, input, (int) decompressedLength,
+              (int) compressedLength);
+        } else {
+          shuffleToDisk(host, mapOutput, input, compressedLength);
+        }
+      } catch (java.lang.InternalError e) {
+        LOG.warn("Failed to shuffle for fetcher#"+id, e);
+        throw new IOException(e);
       }
       }
-      
       // Inform the shuffle scheduler
       // Inform the shuffle scheduler
       long endTime = System.currentTimeMillis();
       long endTime = System.currentTimeMillis();
       scheduler.copySucceeded(mapId, host, compressedLength, 
       scheduler.copySucceeded(mapId, host, compressedLength,