1
0
Просмотр исходного кода

HADOOP-9307. BufferedFSInputStream.read returns wrong results after certain seeks. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1482377 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon 12 лет назад
Родитель
Сommit
e29170e771

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -717,6 +717,9 @@ Release 2.0.5-beta - UNRELEASED
     Service HEALTHY, and results in null data at ActiveBreadCrumb.
     (Vinay and todd via todd)
 
+    HADOOP-9307. BufferedFSInputStream.read returns wrong results
+    after certain seeks. (todd)
+
 Release 2.0.4-alpha - 2013-04-25 
 
   INCOMPATIBLE CHANGES

+ 11 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java

@@ -69,12 +69,17 @@ implements Seekable, PositionedReadable, HasFileDescriptor {
     if( pos<0 ) {
       return;
     }
-    // optimize: check if the pos is in the buffer
-    long end = ((FSInputStream)in).getPos();
-    long start = end - count;
-    if( pos>=start && pos<end) {
-      this.pos = (int)(pos-start);
-      return;
+    if (this.pos != this.count) {
+      // optimize: check if the pos is in the buffer
+      // This optimization only works if pos != count -- if they are
+      // equal, it's possible that the previous reads were just
+      // longer than the total buffer size, and hence skipped the buffer.
+      long end = ((FSInputStream)in).getPos();
+      long start = end - count;
+      if( pos>=start && pos<end) {
+        this.pos = (int)(pos-start);
+        return;
+      }
     }
 
     // invalidate buffer

+ 72 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java

@@ -21,10 +21,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
 
 import static org.apache.hadoop.fs.FileSystemTestHelper.*;
 
 import java.io.*;
+import java.util.Arrays;
+import java.util.Random;
 
 import static org.junit.Assert.*;
 import static org.junit.Assume.assumeTrue;
@@ -41,6 +44,7 @@ public class TestLocalFileSystem {
     = System.getProperty("test.build.data","build/test/data") + "/work-dir/localfs";
 
   private final File base = new File(TEST_ROOT_DIR);
+  private final Path TEST_PATH = new Path(TEST_ROOT_DIR, "test-file");
   private Configuration conf;
   private LocalFileSystem fileSys;
 
@@ -365,6 +369,73 @@ public class TestLocalFileSystem {
     status = fileSys.getFileStatus(path);
     assertEquals(newModTime, status.getModificationTime());
     assertEquals(0, status.getAccessTime());
-}
+  }
+
+  /**
+   * Regression test for HADOOP-9307: BufferedFSInputStream returning
+   * wrong results after certain sequences of seeks and reads.
+   */
+  @Test
+  public void testBufferedFSInputStream() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setClass("fs.file.impl", RawLocalFileSystem.class, FileSystem.class);
+    conf.setInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, 4096);
+    FileSystem fs = FileSystem.newInstance(conf);
+    
+    byte[] buf = new byte[10*1024];
+    new Random().nextBytes(buf);
+    
+    // Write random bytes to file
+    FSDataOutputStream stream = fs.create(TEST_PATH);
+    try {
+      stream.write(buf);
+    } finally {
+      stream.close();
+    }
+    
+    Random r = new Random();
 
+    FSDataInputStream stm = fs.open(TEST_PATH);
+    // Record the sequence of seeks and reads which trigger a failure.
+    int seeks[] = new int[10];
+    int reads[] = new int[10];
+    try {
+      for (int i = 0; i < 1000; i++) {
+        int seekOff = r.nextInt(buf.length); 
+        int toRead = r.nextInt(Math.min(buf.length - seekOff, 32000));
+        
+        seeks[i % seeks.length] = seekOff;
+        reads[i % reads.length] = toRead;
+        verifyRead(stm, buf, seekOff, toRead);
+        
+      }
+    } catch (AssertionError afe) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Sequence of actions:\n");
+      for (int j = 0; j < seeks.length; j++) {
+        sb.append("seek @ ").append(seeks[j]).append("  ")
+          .append("read ").append(reads[j]).append("\n");
+      }
+      System.err.println(sb.toString());
+      throw afe;
+    } finally {
+      stm.close();
+    }
+  }
+  
+  private void verifyRead(FSDataInputStream stm, byte[] fileContents,
+       int seekOff, int toRead) throws IOException {
+    byte[] out = new byte[toRead];
+    stm.seek(seekOff);
+    stm.readFully(out);
+    byte[] expected = Arrays.copyOfRange(fileContents, seekOff, seekOff+toRead);
+    if (!Arrays.equals(out, expected)) {
+      String s ="\nExpected: " +
+          StringUtils.byteToHexString(expected) +
+          "\ngot:      " +
+          StringUtils.byteToHexString(out) + 
+          "\noff=" + seekOff + " len=" + toRead;
+      fail(s);
+    }
+  }
 }