Bläddra i källkod

HADOOP-4319. fuse-dfs dfs_read function returns as many bytes as it is
told to read unlesss end-of-file is reached. (Pete Wyckoff via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.19@701299 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 16 år sedan
förälder
incheckning
9e95b5d816

+ 3 - 0
CHANGES.txt

@@ -784,6 +784,9 @@ Release 0.19.0 - Unreleased
     HADOOP-4257. The DFS client should pick only one datanode as the candidate
     HADOOP-4257. The DFS client should pick only one datanode as the candidate
     to initiate lease recovery.  (Tsz Wo (Nicholas), SZE via dhruba)
     to initiate lease recovery.  (Tsz Wo (Nicholas), SZE via dhruba)
 
 
+    HADOOP-4319. fuse-dfs dfs_read function returns as many bytes as it is
+    told to read unlesss end-of-file is reached.  (Pete Wyckoff via dhruba)
+
 Release 0.18.2 - Unreleased
 Release 0.18.2 - Unreleased
 
 
   BUG FIXES
   BUG FIXES

+ 7 - 1
src/contrib/fuse-dfs/src/fuse_dfs.c

@@ -644,9 +644,15 @@ static int dfs_read(const char *path, char *buf, size_t size, off_t offset,
   assert(path);
   assert(path);
   assert(buf);
   assert(buf);
 
 
+
   dfs_fh *fh = (dfs_fh*)fi->fh;
   dfs_fh *fh = (dfs_fh*)fi->fh;
+
+  if(size >= dfs->rdbuffer_size) {
+    return hdfsPread(fh->fs, fh->hdfsFH, offset, buf, size);
+  }
+
   //fprintf(stderr, "Cache bounds for %s: %llu -> %llu (%d bytes). Check for offset %llu\n", path, fh->startOffset, fh->startOffset + fh->sizeBuffer, fh->sizeBuffer, offset);
   //fprintf(stderr, "Cache bounds for %s: %llu -> %llu (%d bytes). Check for offset %llu\n", path, fh->startOffset, fh->startOffset + fh->sizeBuffer, fh->sizeBuffer, offset);
-  if (fh->sizeBuffer == 0  || offset < fh->startOffset || offset > (fh->startOffset + fh->sizeBuffer)  )
+  if (fh->sizeBuffer == 0  || offset < fh->startOffset || offset + size > (fh->startOffset + fh->sizeBuffer)  )
     {
     {
       // do the actual read
       // do the actual read
       //fprintf (stderr,"Reading %s from HDFS, offset %llu, amount %d\n", path, offset, dfs->rdbuffer_size);
       //fprintf (stderr,"Reading %s from HDFS, offset %llu, amount %d\n", path, offset, dfs->rdbuffer_size);

+ 64 - 1
src/contrib/fuse-dfs/src/test/TestFuseDFS.java

@@ -57,7 +57,8 @@ public class TestFuseDFS extends TestCase {
     String lp = System.getProperty("LD_LIBRARY_PATH") + ":" + "/usr/local/lib:" + libhdfs + ":" + jvm;
     String lp = System.getProperty("LD_LIBRARY_PATH") + ":" + "/usr/local/lib:" + libhdfs + ":" + jvm;
     System.err.println("LD_LIBRARY_PATH=" + lp);
     System.err.println("LD_LIBRARY_PATH=" + lp);
     String cmd[] =  {  fuse_cmd, "dfs://" + dfs.getHost() + ":" + String.valueOf(dfs.getPort()), 
     String cmd[] =  {  fuse_cmd, "dfs://" + dfs.getHost() + ":" + String.valueOf(dfs.getPort()), 
-                       mountpoint, "-obig_writes", "-odebug", "-oentry_timeout=1",  "-oattribute_timeout=1", "-ousetrash", "rw", "-oinitchecks" };
+                       mountpoint, "-obig_writes", "-odebug", "-oentry_timeout=1",  "-oattribute_timeout=1", "-ousetrash", "rw", "-oinitchecks",
+    "-ordbuffer=5000"};
     final String [] envp = {
     final String [] envp = {
       "CLASSPATH="+  cp,
       "CLASSPATH="+  cp,
       "LD_LIBRARY_PATH=" + lp,
       "LD_LIBRARY_PATH=" + lp,
@@ -381,6 +382,68 @@ public class TestFuseDFS extends TestCase {
     }
     }
   }
   }
 
 
+  /**
+   *
+   * Test dfs_read on a file size that will trigger multiple internal reads. 
+   * First, just check raw size reading is ok and then check with smaller reads
+   * including checking the validity of the data read.
+   *
+   */
+  public void testReads() throws IOException,InterruptedException  {
+    try {
+      // First create a new directory with mkdirs
+      Runtime r = Runtime.getRuntime();
+      Process p;
+
+      // create the file
+      Path myPath = new Path("/test/hello.reads");
+      FSDataOutputStream s = fileSys.create(myPath);
+      String hello = "hello world!";
+      int written = 0;
+      int mycount = 0;
+      while(written < 1024 * 9) {
+        s.writeUTF(hello);
+        s.writeInt(mycount++);
+        written += hello.length() + 4;
+      }
+      s.close();
+
+      // check it exists
+      assertTrue(fileSys.exists(myPath));
+      FileStatus foo = fileSys.getFileStatus(myPath);
+      assertTrue(foo.getLen() >= 9 * 1024);
+
+      {
+        // cat the file
+        DataInputStream is = new DataInputStream(new FileInputStream(mpoint + "/test/hello.reads"));
+        byte buf [] = new byte[4096];
+        assertTrue(is.read(buf, 0, 1024) == 1024);
+        assertTrue(is.read(buf, 0, 4096) == 4096);
+        assertTrue(is.read(buf, 0, 4096) == 4096);
+        is.close();
+      }
+
+      {
+        DataInputStream is = new DataInputStream(new FileInputStream(mpoint + "/test/hello.reads"));
+        int read = 0;
+        int counter = 0;
+        try {
+          while(true) {
+            String s2 = DataInputStream.readUTF(is);
+            int s3 = is.readInt();
+            assertTrue(s2.equals(hello));
+            assertTrue(s3 == counter++);
+            read += hello.length() + 4;
+          }
+        } catch(EOFException e) {
+          assertTrue(read >= 9 * 1024);
+        }
+      }
+    } catch(Exception e) {
+      e.printStackTrace();
+    } finally {
+    }
+  }
 
 
 
 
   /**
   /**