Browse Source

Merge -r 1359344:1359345 from trunk to branch. FIXES: MAPREDUCE-3993

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1359347 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur 13 years ago
parent
commit
3097c15d9d

+ 22 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java

@@ -153,6 +153,28 @@ public class IOUtils {
     }
   }
   
+  /**
+   * Utility wrapper for reading from {@link InputStream}. It catches any errors
+   * thrown by the underlying stream (either IO or decompression-related), and
+   * re-throws as an IOException.
+   * 
+   * @param is - InputStream to be read from
+   * @param buf - buffer the data is read into
+   * @param off - offset within buf
+   * @param len - amount of data to be read
+   * @return number of bytes read
+   */
+  public static int wrappedReadForCompressedData(InputStream is, byte[] buf,
+      int off, int len) throws IOException {
+    try {
+      return is.read(buf, off, len);
+    } catch (IOException ie) {
+      throw ie;
+    } catch (Throwable t) {
+      throw new IOException("Error while reading compressed data", t);
+    }
+  }
+
   /**
    * Reads len bytes in a loop.
    *

+ 23 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestIOUtils.java

@@ -29,6 +29,7 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -152,4 +153,26 @@ public class TestIOUtils {
       }
     }
   }
+
+  @Test
+  public void testWrappedReadForCompressedData() throws IOException {
+    byte[] buf = new byte[2];
+    InputStream mockStream = Mockito.mock(InputStream.class);
+    Mockito.when(mockStream.read(buf, 0, 1)).thenReturn(1);
+    Mockito.when(mockStream.read(buf, 0, 2)).thenThrow(
+        new java.lang.InternalError());
+
+    try {
+      assertEquals("Check expected value", 1,
+          IOUtils.wrappedReadForCompressedData(mockStream, buf, 0, 1));
+    } catch (IOException ioe) {
+      fail("Unexpected error while reading");
+    }
+    try {
+      IOUtils.wrappedReadForCompressedData(mockStream, buf, 0, 2);
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "Error while reading compressed data", ioe);
+    }
+  }
 }

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -83,6 +83,9 @@ Release 2.0.1-alpha - UNRELEASED
 
     MAPREDUCE-2739. Update installation docs (remove YarnClientFactory) (bowang via tucu)
 
+    MAPREDUCE-3993. Graceful handling of codec errors during decompression 
+    (kkambatl via tucu)
+
 Release 2.0.0-alpha - 05-23-2012
 
   INCOMPATIBLE CHANGES

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -379,7 +380,8 @@ public class IFile {
     private int readData(byte[] buf, int off, int len) throws IOException {
       int bytesRead = 0;
       while (bytesRead < len) {
-        int n = in.read(buf, off+bytesRead, len-bytesRead);
+        int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead,
+            len - bytesRead);
         if (n < 0) {
           return bytesRead;
         }