Ver Fonte

MAPREDUCE-3993. Graceful handling of codec errors during decompression (kkambatl via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1359348 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur há 13 anos atrás
pai
commit
3395916da8

+ 3 - 0
CHANGES.txt

@@ -60,6 +60,9 @@ Release 1.2.0 - unreleased
 
     MAPREDUCE-4359. Potential deadlock in Counters. (tomwhite)
 
+    MAPREDUCE-3993. Graceful handling of codec errors during decompression 
+    (kkambatl via tucu)
+
 Release 1.1.0 - unreleased
 
   INCOMPATIBLE CHANGES

+ 22 - 0
src/core/org/apache/hadoop/io/IOUtils.java

@@ -138,6 +138,28 @@ public class IOUtils {
     }
   }
 
+  /**
+   * Utility wrapper for reading from {@link InputStream}. It catches any errors
+   * thrown by the underlying stream (either IO or decompression-related), and
+   * re-throws as an IOException.
+   * 
+   * @param is - InputStream to be read from
+   * @param buf - buffer the data is read into
+   * @param off - offset within buf
+   * @param len - amount of data to be read
+   * @return number of bytes read
+   */
+  public static int wrappedReadForCompressedData(InputStream is, byte[] buf,
+      int off, int len) throws IOException {
+    try {
+      return is.read(buf, off, len);
+    } catch (IOException ie) {
+      throw ie;
+    } catch (Throwable t) {
+      throw new IOException("Error while reading compressed data", t);
+    }
+  }
+
   /** Reads len bytes in a loop.
    * @param in The InputStream to read from
    * @param buf The buffer to fill

+ 3 - 1
src/mapred/org/apache/hadoop/mapred/IFile.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -325,7 +326,8 @@ class IFile {
     private int readData(byte[] buf, int off, int len) throws IOException {
       int bytesRead = 0;
       while (bytesRead < len) {
-        int n = in.read(buf, off+bytesRead, len-bytesRead);
+        int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead,
+            len - bytesRead);
         if (n < 0) {
           return bytesRead;
         }

+ 4 - 3
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -1702,15 +1702,16 @@ class ReduceTask extends Task {
         
         int bytesRead = 0;
         try {
-          int n = input.read(shuffleData, 0, shuffleData.length);
+          int n = IOUtils.wrappedReadForCompressedData(input, shuffleData, 0,
+              shuffleData.length);
           while (n > 0) {
             bytesRead += n;
             shuffleClientMetrics.inputBytes(n);
 
             // indicate we're making progress
             reporter.progress();
-            n = input.read(shuffleData, bytesRead, 
-                           (shuffleData.length-bytesRead));
+            n = IOUtils.wrappedReadForCompressedData(input, shuffleData,
+                bytesRead, shuffleData.length - bytesRead);
           }
 
           if (LOG.isDebugEnabled()) {