Browse Source

HDFS-5881. Fix skip() of the short-circuit local reader(legacy). Contributed by Kihwal Lee.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1565315 13f79535-47bb-0310-9956-ffa450edef68
Kihwal Lee 11 years ago
parent
commit
b64eee4e34

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -11,6 +11,7 @@ Release 0.23.11 - UNRELEASED
   OPTIMIZATIONS
   OPTIMIZATIONS
     
     
   BUG FIXES
   BUG FIXES
+    HDFS-5881. Fix skip() of the short-circuit local reader(legacy). (kihwal)
 
 
 Release 0.23.10 - 2013-12-09
 Release 0.23.10 - 2013-12-09
 
 

+ 24 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java

@@ -395,17 +395,17 @@ class BlockReaderLocal implements BlockReader {
         skipBuf = new byte[bytesPerChecksum];
         skipBuf = new byte[bytesPerChecksum];
       }
       }
       int ret = read(skipBuf, 0, (int)(n - remaining));
       int ret = read(skipBuf, 0, (int)(n - remaining));
-      return ret;
+      return (remaining + ret);
     }
     }
   
   
     // optimize for big gap: discard the current buffer, skip to
     // optimize for big gap: discard the current buffer, skip to
     // the beginning of the appropriate checksum chunk and then
     // the beginning of the appropriate checksum chunk and then
     // read to the middle of that chunk to be in sync with checksums.
     // read to the middle of that chunk to be in sync with checksums.
-    this.offsetFromChunkBoundary = newPosition % bytesPerChecksum;
-    long toskip = n - remaining - this.offsetFromChunkBoundary;
+    int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum;
+    long toskip = n - remaining - myOffsetFromChunkBoundary;
   
   
-    dataBuff.clear();
-    checksumBuff.clear();
+    dataBuff.position(dataBuff.limit());
+    checksumBuff.position(checksumBuff.limit());
   
   
     long dataSkipped = dataIn.skip(toskip);
     long dataSkipped = dataIn.skip(toskip);
     if (dataSkipped != toskip) {
     if (dataSkipped != toskip) {
@@ -419,17 +419,30 @@ class BlockReaderLocal implements BlockReader {
       }
       }
     }
     }
 
 
-    // read into the middle of the chunk
+    // Reset this.offsetFromChunkBoundary so that the next read
+    // returns data from the beginning of the chunk.
+    this.offsetFromChunkBoundary = 0;
+
+    // If the new position is chunk-aligned, we are done.
+    if (myOffsetFromChunkBoundary == 0) {
+      assert (toskip + remaining) == n;
+      return (toskip + remaining);
+    }
+
+    // The new position is not chunk-aligned, so we need to skip
+    // myOffsetFromChunkBoundary bytes more.
     if (skipBuf == null) {
     if (skipBuf == null) {
       skipBuf = new byte[bytesPerChecksum];
       skipBuf = new byte[bytesPerChecksum];
     }
     }
     assert skipBuf.length == bytesPerChecksum;
     assert skipBuf.length == bytesPerChecksum;
-    assert this.offsetFromChunkBoundary < bytesPerChecksum;
-    int ret = read(skipBuf, 0, this.offsetFromChunkBoundary);
+    assert myOffsetFromChunkBoundary < bytesPerChecksum;
+
+    int ret = read(skipBuf, 0, myOffsetFromChunkBoundary);
+
     if (ret == -1) {  // EOS
     if (ret == -1) {  // EOS
-      return toskip;
+      return (toskip + remaining);
     } else {
     } else {
-      return (toskip + ret);
+      return (toskip + remaining + ret);
     }
     }
   }
   }
 
 
@@ -470,4 +483,4 @@ class BlockReaderLocal implements BlockReader {
   public boolean hasSentStatusCode() {
   public boolean hasSentStatusCode() {
     return false;
     return false;
   }
   }
-}
+}

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -955,6 +955,14 @@ public class DFSInputStream extends FSInputStream {
           pos += blockReader.skip(diff);
           pos += blockReader.skip(diff);
           if (pos == targetPos) {
           if (pos == targetPos) {
             done = true;
             done = true;
+          } else {
+            // The range was already checked. If the block reader returns
+            // something unexpected instead of throwing an exception, it is
+            // most likely a bug. 
+            String errMsg = "BlockReader failed to seek to " +
+                targetPos + ". Instead, it seeked to " + pos + ".";
+            DFSClient.LOG.warn(errMsg);
+            throw new IOException(errMsg);
           }
           }
         } catch (IOException e) {//make following read to retry
         } catch (IOException e) {//make following read to retry
           if(DFSClient.LOG.isDebugEnabled()) {
           if(DFSClient.LOG.isDebugEnabled()) {

+ 11 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java

@@ -54,7 +54,10 @@ public class TestShortCircuitLocalRead {
   static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
   static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
 
 
   static final long seed = 0xDEADBEEFL;
   static final long seed = 0xDEADBEEFL;
-  static final int blockSize = 5120;
+  // block size needs to be bigger than the internal buffer, so that block redaer
+  // can be reused.
+  static final int blockSize = 65536;
+  static final int internalDataBufSize = 512 * 64;
   boolean simulatedStorage = false;
   boolean simulatedStorage = false;
   
   
   // creates a file but does not close it
   // creates a file but does not close it
@@ -91,7 +94,12 @@ public class TestShortCircuitLocalRead {
     // Now read using a different API.
     // Now read using a different API.
     actual = new byte[expected.length-readOffset];
     actual = new byte[expected.length-readOffset];
     stm = fs.open(name);
     stm = fs.open(name);
-    long skipped = stm.skip(readOffset);
+    long skipped = 0;
+    if (readOffset >= internalDataBufSize) {
+      // force multiple seeks across internal data buffer boundary.
+      skipped += stm.read(actual, 0, internalDataBufSize);
+    }
+    skipped += stm.skip(readOffset - skipped);
     Assert.assertEquals(skipped, readOffset);
     Assert.assertEquals(skipped, readOffset);
     //Read a small number of bytes first.
     //Read a small number of bytes first.
     int nread = stm.read(actual, 0, 3);
     int nread = stm.read(actual, 0, 3);
@@ -172,6 +180,7 @@ public class TestShortCircuitLocalRead {
   public void testReadFromAnOffset() throws IOException {
   public void testReadFromAnOffset() throws IOException {
     doTestShortCircuitRead(false, 3*blockSize+100, 777);
     doTestShortCircuitRead(false, 3*blockSize+100, 777);
     doTestShortCircuitRead(true, 3*blockSize+100, 777);
     doTestShortCircuitRead(true, 3*blockSize+100, 777);
+    doTestShortCircuitRead(false, 3*blockSize+100, internalDataBufSize + 7245);
   }
   }
   
   
   @Test
   @Test