浏览代码

HDFS-14564: Add libhdfs APIs for readFully; add readFully to ByteBufferPositionedReadable (#963) Contributed by Sahil Takiar.

Reviewed-by: Siyao Meng <smeng@cloudera.com>
Sahil Takiar 5 年之前
父节点
当前提交
13b427fc05
共有 16 个文件被更改,包括 427 次插入67 次删除
  1. 44 23
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
  2. 24 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
  3. 16 7
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
  4. 84 31
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java
  5. 24 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java
  6. 5 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java
  7. 5 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java
  8. 14 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
  9. 41 2
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c
  10. 117 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c
  11. 20 1
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h
  12. 6 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c
  13. 1 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_defines.h
  14. 1 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h
  15. 1 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h
  16. 24 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java

+ 44 - 23
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java

@@ -330,8 +330,8 @@ public class CryptoInputStream extends FilterInputStream implements
       throws IOException {
       throws IOException {
     checkStream();
     checkStream();
     if (!(in instanceof PositionedReadable)) {
     if (!(in instanceof PositionedReadable)) {
-      throw new UnsupportedOperationException("This stream does not support " +
-          "positioned read.");
+      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+          + " does not support positioned read.");
     }
     }
     final int n = ((PositionedReadable) in).read(position, buffer, offset,
     final int n = ((PositionedReadable) in).read(position, buffer, offset,
         length);
         length);
@@ -351,8 +351,8 @@ public class CryptoInputStream extends FilterInputStream implements
       throws IOException {
       throws IOException {
     checkStream();
     checkStream();
     if (!(in instanceof ByteBufferPositionedReadable)) {
     if (!(in instanceof ByteBufferPositionedReadable)) {
-      throw new UnsupportedOperationException("This stream does not support " +
-          "positioned reads with byte buffers.");
+      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+          + " does not support positioned reads with byte buffers.");
     }
     }
     int bufPos = buf.position();
     int bufPos = buf.position();
     final int n = ((ByteBufferPositionedReadable) in).read(position, buf);
     final int n = ((ByteBufferPositionedReadable) in).read(position, buf);
@@ -363,7 +363,27 @@ public class CryptoInputStream extends FilterInputStream implements
 
 
     return n;
     return n;
   }
   }
-  
+
+  /**
+   * Positioned readFully using {@link ByteBuffer}s. This method is thread-safe.
+   */
+  @Override
+  public void readFully(long position, final ByteBuffer buf)
+      throws IOException {
+    checkStream();
+    if (!(in instanceof ByteBufferPositionedReadable)) {
+      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+          + " does not support positioned reads with byte buffers.");
+    }
+    int bufPos = buf.position();
+    ((ByteBufferPositionedReadable) in).readFully(position, buf);
+    final int n = buf.position() - bufPos;
+    if (n > 0) {
+      // This operation does not change the current offset of the file
+      decrypt(position, buf, n, bufPos);
+    }
+  }
+
   /**
   /**
    * Decrypt length bytes in buffer starting at offset. Output is also put 
    * Decrypt length bytes in buffer starting at offset. Output is also put 
    * into buffer starting at offset. It is thread-safe.
    * into buffer starting at offset. It is thread-safe.
@@ -480,8 +500,8 @@ public class CryptoInputStream extends FilterInputStream implements
       throws IOException {
       throws IOException {
     checkStream();
     checkStream();
     if (!(in instanceof PositionedReadable)) {
     if (!(in instanceof PositionedReadable)) {
-      throw new UnsupportedOperationException("This stream does not support " +
-          "positioned readFully.");
+      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+          + " does not support positioned readFully.");
     }
     }
     ((PositionedReadable) in).readFully(position, buffer, offset, length);
     ((PositionedReadable) in).readFully(position, buffer, offset, length);
     if (length > 0) {
     if (length > 0) {
@@ -513,8 +533,8 @@ public class CryptoInputStream extends FilterInputStream implements
       }
       }
     } else {
     } else {
       if (!(in instanceof Seekable)) {
       if (!(in instanceof Seekable)) {
-        throw new UnsupportedOperationException("This stream does not " +
-                "support seek.");
+        throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+            + " does not support seek.");
       }
       }
       ((Seekable) in).seek(pos);
       ((Seekable) in).seek(pos);
       resetStreamOffset(pos);
       resetStreamOffset(pos);
@@ -672,8 +692,8 @@ public class CryptoInputStream extends FilterInputStream implements
         "Cannot seek to negative offset.");
         "Cannot seek to negative offset.");
     checkStream();
     checkStream();
     if (!(in instanceof Seekable)) {
     if (!(in instanceof Seekable)) {
-      throw new UnsupportedOperationException("This stream does not support " +
-          "seekToNewSource.");
+      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+          + " does not support seekToNewSource.");
     }
     }
     boolean result = ((Seekable) in).seekToNewSource(targetPos);
     boolean result = ((Seekable) in).seekToNewSource(targetPos);
     resetStreamOffset(targetPos);
     resetStreamOffset(targetPos);
@@ -687,16 +707,16 @@ public class CryptoInputStream extends FilterInputStream implements
     checkStream();
     checkStream();
     if (outBuffer.remaining() > 0) {
     if (outBuffer.remaining() > 0) {
       if (!(in instanceof Seekable)) {
       if (!(in instanceof Seekable)) {
-        throw new UnsupportedOperationException("This stream does not " +
-                "support seek.");
+        throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+            + " does not support seek.");
       }
       }
       // Have some decrypted data unread, need to reset.
       // Have some decrypted data unread, need to reset.
       ((Seekable) in).seek(getPos());
       ((Seekable) in).seek(getPos());
       resetStreamOffset(getPos());
       resetStreamOffset(getPos());
     }
     }
     if (!(in instanceof HasEnhancedByteBufferAccess)) {
     if (!(in instanceof HasEnhancedByteBufferAccess)) {
-      throw new UnsupportedOperationException("This stream does not support " +
-          "enhanced byte buffer access.");
+      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+          + " does not support enhanced byte buffer access.");
     }
     }
     final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
     final ByteBuffer buffer = ((HasEnhancedByteBufferAccess) in).
         read(bufferPool, maxLength, opts);
         read(bufferPool, maxLength, opts);
@@ -714,8 +734,8 @@ public class CryptoInputStream extends FilterInputStream implements
   @Override
   @Override
   public void releaseBuffer(ByteBuffer buffer) {
   public void releaseBuffer(ByteBuffer buffer) {
     if (!(in instanceof HasEnhancedByteBufferAccess)) {
     if (!(in instanceof HasEnhancedByteBufferAccess)) {
-      throw new UnsupportedOperationException("This stream does not support " + 
-          "release buffer.");
+      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+          + " does not support release buffer.");
     }
     }
     ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
     ((HasEnhancedByteBufferAccess) in).releaseBuffer(buffer);
   }
   }
@@ -724,8 +744,8 @@ public class CryptoInputStream extends FilterInputStream implements
   public void setReadahead(Long readahead) throws IOException,
   public void setReadahead(Long readahead) throws IOException,
       UnsupportedOperationException {
       UnsupportedOperationException {
     if (!(in instanceof CanSetReadahead)) {
     if (!(in instanceof CanSetReadahead)) {
-      throw new UnsupportedOperationException("This stream does not support " +
-          "setting the readahead caching strategy.");
+      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+          + " does not support setting the readahead caching strategy.");
     }
     }
     ((CanSetReadahead) in).setReadahead(readahead);
     ((CanSetReadahead) in).setReadahead(readahead);
   }
   }
@@ -734,8 +754,9 @@ public class CryptoInputStream extends FilterInputStream implements
   public void setDropBehind(Boolean dropCache) throws IOException,
   public void setDropBehind(Boolean dropCache) throws IOException,
       UnsupportedOperationException {
       UnsupportedOperationException {
     if (!(in instanceof CanSetReadahead)) {
     if (!(in instanceof CanSetReadahead)) {
-      throw new UnsupportedOperationException("This stream does not " +
-          "support setting the drop-behind caching setting.");
+      throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+          + " stream does not support setting the drop-behind caching"
+          + " setting.");
     }
     }
     ((CanSetDropBehind) in).setDropBehind(dropCache);
     ((CanSetDropBehind) in).setDropBehind(dropCache);
   }
   }
@@ -842,8 +863,8 @@ public class CryptoInputStream extends FilterInputStream implements
     case StreamCapabilities.READBYTEBUFFER:
     case StreamCapabilities.READBYTEBUFFER:
     case StreamCapabilities.PREADBYTEBUFFER:
     case StreamCapabilities.PREADBYTEBUFFER:
       if (!(in instanceof StreamCapabilities)) {
       if (!(in instanceof StreamCapabilities)) {
-        throw new UnsupportedOperationException("This stream does not expose " +
-          "its stream capabilities.");
+        throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+          + " does not expose its stream capabilities.");
       }
       }
       return ((StreamCapabilities) in).hasCapability(capability);
       return ((StreamCapabilities) in).hasCapability(capability);
     default:
     default:

+ 24 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java

@@ -17,6 +17,7 @@
  */
  */
 package org.apache.hadoop.fs;
 package org.apache.hadoop.fs;
 
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 
 
@@ -55,6 +56,8 @@ public interface ByteBufferPositionedReadable {
    * <p>
    * <p>
    * Implementations should treat 0-length requests as legitimate, and must not
    * Implementations should treat 0-length requests as legitimate, and must not
    * signal an error upon their receipt.
    * signal an error upon their receipt.
+   * <p>
+   * This does not change the current offset of a file, and is thread-safe.
    *
    *
    * @param position position within file
    * @param position position within file
    * @param buf the ByteBuffer to receive the results of the read operation.
    * @param buf the ByteBuffer to receive the results of the read operation.
@@ -63,4 +66,25 @@ public interface ByteBufferPositionedReadable {
    * @throws IOException if there is some error performing the read
    * @throws IOException if there is some error performing the read
    */
    */
   int read(long position, ByteBuffer buf) throws IOException;
   int read(long position, ByteBuffer buf) throws IOException;
+
+  /**
+   * Reads {@code buf.remaining()} bytes into buf from a given position in
+   * the file or until the end of the data was reached before the read
+   * operation completed. Callers should use {@code buf.limit(...)} to
+   * control the size of the desired read and {@code buf.position(...)} to
+   * control the offset into the buffer the data should be written to.
+   * <p>
+   * This operation provides similar semantics to
+   * {@link #read(long, ByteBuffer)}, the difference is that this method is
+   * guaranteed to read data until the {@link ByteBuffer} is full, or until
+   * the end of the data stream is reached.
+   *
+   * @param position position within file
+   * @param buf the ByteBuffer to receive the results of the read operation.
+   * @throws IOException if there is some error performing the read
+   * @throws EOFException the end of the data was reached before
+   * the read operation completed
+   * @see #read(long, ByteBuffer)
+   */
+  void readFully(long position, ByteBuffer buf) throws IOException;
 }
 }

+ 16 - 7
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

@@ -52,8 +52,8 @@ public class FSDataInputStream extends DataInputStream
   public FSDataInputStream(InputStream in) {
   public FSDataInputStream(InputStream in) {
     super(in);
     super(in);
     if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
     if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
-      throw new IllegalArgumentException(
-          "In is not an instance of Seekable or PositionedReadable");
+      throw new IllegalArgumentException(in.getClass().getCanonicalName() +
+          " is not an instance of Seekable or PositionedReadable");
     }
     }
   }
   }
   
   
@@ -150,7 +150,7 @@ public class FSDataInputStream extends DataInputStream
     }
     }
 
 
     throw new UnsupportedOperationException("Byte-buffer read unsupported " +
     throw new UnsupportedOperationException("Byte-buffer read unsupported " +
-            "by input stream");
+            "by " + in.getClass().getCanonicalName());
   }
   }
 
 
   @Override
   @Override
@@ -170,9 +170,8 @@ public class FSDataInputStream extends DataInputStream
     try {
     try {
       ((CanSetReadahead)in).setReadahead(readahead);
       ((CanSetReadahead)in).setReadahead(readahead);
     } catch (ClassCastException e) {
     } catch (ClassCastException e) {
-      throw new UnsupportedOperationException(
-          "this stream does not support setting the readahead " +
-          "caching strategy.");
+      throw new UnsupportedOperationException(in.getClass().getCanonicalName() +
+          " does not support setting the readahead caching strategy.");
     }
     }
   }
   }
 
 
@@ -256,6 +255,16 @@ public class FSDataInputStream extends DataInputStream
       return ((ByteBufferPositionedReadable) in).read(position, buf);
       return ((ByteBufferPositionedReadable) in).read(position, buf);
     }
     }
     throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
     throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
-        "by input stream");
+        "by " + in.getClass().getCanonicalName());
+  }
+
+  @Override
+  public void readFully(long position, ByteBuffer buf) throws IOException {
+    if (in instanceof ByteBufferPositionedReadable) {
+      ((ByteBufferPositionedReadable) in).readFully(position, buf);
+    } else {
+      throw new UnsupportedOperationException("Byte-buffer pread " +
+              "unsupported by " + in.getClass().getCanonicalName());
+    }
   }
   }
 }
 }

+ 84 - 31
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java

@@ -388,42 +388,41 @@ public abstract class CryptoStreamsTestBase {
     Assert.assertArrayEquals(readData, expectedData);
     Assert.assertArrayEquals(readData, expectedData);
   }
   }
   
   
-  /** Test read fully */
+  /** Test read fully. */
   @Test(timeout=120000)
   @Test(timeout=120000)
   public void testReadFully() throws Exception {
   public void testReadFully() throws Exception {
     OutputStream out = getOutputStream(defaultBufferSize);
     OutputStream out = getOutputStream(defaultBufferSize);
     writeData(out);
     writeData(out);
     
     
-    InputStream in = getInputStream(defaultBufferSize);
-    final int len1 = dataLen / 4;
-    // Read len1 bytes
-    byte[] readData = new byte[len1];
-    readAll(in, readData, 0, len1);
-    byte[] expectedData = new byte[len1];
-    System.arraycopy(data, 0, expectedData, 0, len1);
-    Assert.assertArrayEquals(readData, expectedData);
-    
-    // Pos: 1/3 dataLen
-    readFullyCheck(in, dataLen / 3);
-    
-    // Read len1 bytes
-    readData = new byte[len1];
-    readAll(in, readData, 0, len1);
-    expectedData = new byte[len1];
-    System.arraycopy(data, len1, expectedData, 0, len1);
-    Assert.assertArrayEquals(readData, expectedData);
-    
-    // Pos: 1/2 dataLen
-    readFullyCheck(in, dataLen / 2);
-    
-    // Read len1 bytes
-    readData = new byte[len1];
-    readAll(in, readData, 0, len1);
-    expectedData = new byte[len1];
-    System.arraycopy(data, 2 * len1, expectedData, 0, len1);
-    Assert.assertArrayEquals(readData, expectedData);
-    
-    in.close();
+    try (InputStream in = getInputStream(defaultBufferSize)) {
+      final int len1 = dataLen / 4;
+      // Read len1 bytes
+      byte[] readData = new byte[len1];
+      readAll(in, readData, 0, len1);
+      byte[] expectedData = new byte[len1];
+      System.arraycopy(data, 0, expectedData, 0, len1);
+      Assert.assertArrayEquals(readData, expectedData);
+
+      // Pos: 1/3 dataLen
+      readFullyCheck(in, dataLen / 3);
+
+      // Read len1 bytes
+      readData = new byte[len1];
+      readAll(in, readData, 0, len1);
+      expectedData = new byte[len1];
+      System.arraycopy(data, len1, expectedData, 0, len1);
+      Assert.assertArrayEquals(readData, expectedData);
+
+      // Pos: 1/2 dataLen
+      readFullyCheck(in, dataLen / 2);
+
+      // Read len1 bytes
+      readData = new byte[len1];
+      readAll(in, readData, 0, len1);
+      expectedData = new byte[len1];
+      System.arraycopy(data, 2 * len1, expectedData, 0, len1);
+      Assert.assertArrayEquals(readData, expectedData);
+    }
   }
   }
   
   
   private void readFullyCheck(InputStream in, int pos) throws Exception {
   private void readFullyCheck(InputStream in, int pos) throws Exception {
@@ -441,6 +440,60 @@ public abstract class CryptoStreamsTestBase {
     } catch (EOFException e) {
     } catch (EOFException e) {
     }
     }
   }
   }
+
+  /** Test byte byffer read fully. */
+  @Test(timeout=120000)
+  public void testByteBufferReadFully() throws Exception {
+    OutputStream out = getOutputStream(defaultBufferSize);
+    writeData(out);
+
+    try (InputStream in = getInputStream(defaultBufferSize)) {
+      final int len1 = dataLen / 4;
+      // Read len1 bytes
+      byte[] readData = new byte[len1];
+      readAll(in, readData, 0, len1);
+      byte[] expectedData = new byte[len1];
+      System.arraycopy(data, 0, expectedData, 0, len1);
+      Assert.assertArrayEquals(readData, expectedData);
+
+      // Pos: 1/3 dataLen
+      byteBufferReadFullyCheck(in, dataLen / 3);
+
+      // Read len1 bytes
+      readData = new byte[len1];
+      readAll(in, readData, 0, len1);
+      expectedData = new byte[len1];
+      System.arraycopy(data, len1, expectedData, 0, len1);
+      Assert.assertArrayEquals(readData, expectedData);
+
+      // Pos: 1/2 dataLen
+      byteBufferReadFullyCheck(in, dataLen / 2);
+
+      // Read len1 bytes
+      readData = new byte[len1];
+      readAll(in, readData, 0, len1);
+      expectedData = new byte[len1];
+      System.arraycopy(data, 2 * len1, expectedData, 0, len1);
+      Assert.assertArrayEquals(readData, expectedData);
+    }
+  }
+
+  private void byteBufferReadFullyCheck(InputStream in, int pos)
+          throws Exception {
+    ByteBuffer result = ByteBuffer.allocate(dataLen - pos);
+    ((ByteBufferPositionedReadable) in).readFully(pos, result);
+
+    byte[] expectedData = new byte[dataLen - pos];
+    System.arraycopy(data, pos, expectedData, 0, dataLen - pos);
+    Assert.assertArrayEquals(result.array(), expectedData);
+
+    result = ByteBuffer.allocate(dataLen); // Exceeds maximum length
+    try {
+      ((ByteBufferPositionedReadable) in).readFully(pos, result);
+      Assert.fail("Read fully exceeds maximum length should fail.");
+    } catch (EOFException e) {
+    }
+  }
   
   
   /** Test seek to different position. */
   /** Test seek to different position. */
   @Test(timeout=120000)
   @Test(timeout=120000)

+ 24 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java

@@ -330,6 +330,30 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
       return -1;
       return -1;
     }
     }
 
 
+    @Override
+    public void readFully(long position, ByteBuffer buf) throws IOException {
+      if (buf == null) {
+        throw new NullPointerException();
+      } else if (!buf.hasRemaining()) {
+        return;
+      }
+
+      if (position > length) {
+        throw new IOException("Cannot read after EOF.");
+      }
+      if (position < 0) {
+        throw new IOException("Cannot read to negative offset.");
+      }
+
+      checkStream();
+
+      if (position + buf.remaining() > length) {
+        throw new EOFException("Reach the end of stream.");
+      }
+
+      buf.put(data, (int) position, buf.remaining());
+    }
+
     @Override
     @Override
     public void readFully(long position, byte[] b, int off, int len)
     public void readFully(long position, byte[] b, int off, int len)
         throws IOException {
         throws IOException {

+ 5 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java

@@ -95,6 +95,11 @@ public class TestCryptoStreamsForLocalFS extends CryptoStreamsTestBase {
   @Override
   @Override
   @Test(timeout=10000)
   @Test(timeout=10000)
   public void testPositionedReadWithByteBuffer() throws IOException {}
   public void testPositionedReadWithByteBuffer() throws IOException {}
+
+  @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
+  @Override
+  @Test(timeout=10000)
+  public void testByteBufferReadFully() throws Exception {}
   
   
   @Ignore("ChecksumFSOutputSummer doesn't support Syncable")
   @Ignore("ChecksumFSOutputSummer doesn't support Syncable")
   @Override
   @Override

+ 5 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java

@@ -96,6 +96,11 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase {
   @Test(timeout=10000)
   @Test(timeout=10000)
   public void testPositionedReadWithByteBuffer() throws IOException {}
   public void testPositionedReadWithByteBuffer() throws IOException {}
 
 
+  @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
+  @Override
+  @Test(timeout=10000)
+  public void testByteBufferReadFully() throws Exception {}
+
   @Ignore("Wrapped stream doesn't support ReadFully")
   @Ignore("Wrapped stream doesn't support ReadFully")
   @Override
   @Override
   @Test(timeout=10000)
   @Test(timeout=10000)

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.fs.CanSetDropBehind;
 import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
@@ -1570,6 +1571,19 @@ public class DFSInputStream extends FSInputStream
     return pread(position, buf);
     return pread(position, buf);
   }
   }
 
 
+  @Override
+  public void readFully(long position, final ByteBuffer buf)
+      throws IOException {
+    int nread = 0;
+    while (buf.hasRemaining()) {
+      int nbytes = read(position + nread, buf);
+      if (nbytes < 0) {
+        throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
+      }
+      nread += nbytes;
+    }
+  }
+
   /** Utility class to encapsulate data node info and its address. */
   /** Utility class to encapsulate data node info and its address. */
   static final class DNAddrPair {
   static final class DNAddrPair {
     final DatanodeInfo info;
     final DatanodeInfo info;

+ 41 - 2
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c

@@ -360,6 +360,25 @@ int main(int argc, char **argv) {
             shutdown_and_exit(cl, -1);
             shutdown_and_exit(cl, -1);
         }
         }
 
 
+        // hdfsPreadFully (direct) test
+        if (hdfsPreadFully(fs, preadFile, 0, (void*)buffer,
+                (tSize)(strlen(fileContents) + 1))) {
+            fprintf(stderr, "Failed to preadFully (direct).");
+            shutdown_and_exit(cl, -1);
+        }
+        if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+            fprintf(stderr, "Failed to preadFully (direct). Expected %s but "
+                            "got %s\n", fileContents, buffer);
+            shutdown_and_exit(cl, -1);
+        }
+        fprintf(stderr, "PreadFully (direct) following %d bytes:\n%s\n",
+                num_pread_bytes, buffer);
+        memset(buffer, 0, strlen(fileContents + 1));
+        if (hdfsTell(fs, preadFile) != 0) {
+            fprintf(stderr, "PreadFully changed position of file\n");
+            shutdown_and_exit(cl, -1);
+        }
+
         // Disable the direct pread path so that we really go through the slow
         // Disable the direct pread path so that we really go through the slow
         // read path
         // read path
         hdfsFileDisableDirectPread(preadFile);
         hdfsFileDisableDirectPread(preadFile);
@@ -388,19 +407,39 @@ int main(int argc, char **argv) {
             shutdown_and_exit(cl, -1);
             shutdown_and_exit(cl, -1);
         }
         }
 
 
+        // Test pread midway through the file rather than at the beginning
         num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, sizeof(buffer));
         num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, sizeof(buffer));
         if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 0) {
         if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 0) {
-            fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n",
+            fprintf(stderr, "Failed to pread. Expected %s but got %s (%d bytes)\n",
                     fileContentsChunk, buffer, num_read_bytes);
                     fileContentsChunk, buffer, num_read_bytes);
             shutdown_and_exit(cl, -1);
             shutdown_and_exit(cl, -1);
         }
         }
-        fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", num_pread_bytes, buffer);
+        fprintf(stderr, "Pread following %d bytes:\n%s\n", num_pread_bytes, buffer);
         memset(buffer, 0, strlen(fileContents + 1));
         memset(buffer, 0, strlen(fileContents + 1));
         if (hdfsTell(fs, preadFile) != 0) {
         if (hdfsTell(fs, preadFile) != 0) {
             fprintf(stderr, "Pread changed position of file\n");
             fprintf(stderr, "Pread changed position of file\n");
             shutdown_and_exit(cl, -1);
             shutdown_and_exit(cl, -1);
         }
         }
 
 
+        // hdfsPreadFully test
+        if (hdfsPreadFully(fs, preadFile, 0, (void*)buffer,
+                            (tSize)(strlen(fileContents) + 1))) {
+            fprintf(stderr, "Failed to preadFully.");
+            shutdown_and_exit(cl, -1);
+        }
+        if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
+            fprintf(stderr, "Failed to preadFully. Expected %s but got %s\n",
+                    fileContents, buffer);
+            shutdown_and_exit(cl, -1);
+        }
+        fprintf(stderr, "PreadFully following %d bytes:\n%s\n",
+                num_pread_bytes, buffer);
+        memset(buffer, 0, strlen(fileContents + 1));
+        if (hdfsTell(fs, preadFile) != 0) {
+            fprintf(stderr, "PreadFully changed position of file\n");
+            shutdown_and_exit(cl, -1);
+        }
+
         hdfsCloseFile(fs, preadFile);
         hdfsCloseFile(fs, preadFile);
 
 
         // Test correct behaviour for unsupported filesystems
         // Test correct behaviour for unsupported filesystems

+ 117 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c

@@ -57,6 +57,9 @@ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
 tSize preadDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer,
 tSize preadDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer,
                   tSize length);
                   tSize length);
 
 
+int preadFullyDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer,
+                  tSize length);
+
 static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo);
 static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo);
 
 
 /**
 /**
@@ -1645,6 +1648,7 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
             "hdfsPread: NewByteArray");
             "hdfsPread: NewByteArray");
         return -1;
         return -1;
     }
     }
+
     jthr = invokeMethod(env, &jVal, INSTANCE, f->file,
     jthr = invokeMethod(env, &jVal, INSTANCE, f->file,
             JC_FS_DATA_INPUT_STREAM, "read", "(J[BII)I", position,
             JC_FS_DATA_INPUT_STREAM, "read", "(J[BII)I", position,
             jbRarray, 0, length);
             jbRarray, 0, length);
@@ -1727,6 +1731,119 @@ tSize preadDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer,
     return jVal.i;
     return jVal.i;
 }
 }
 
 
+/**
+ * Like hdfsPread, if the underlying stream supports the
+ * ByteBufferPositionedReadable interface then this method will transparently
+ * use readFully(long, ByteBuffer).
+ */
+int hdfsPreadFully(hdfsFS fs, hdfsFile f, tOffset position,
+                void* buffer, tSize length) {
+    JNIEnv* env;
+    jbyteArray jbRarray;
+    jthrowable jthr;
+
+    if (length == 0) {
+        return 0;
+    } else if (length < 0) {
+        errno = EINVAL;
+        return -1;
+    }
+    if (!f || f->type == HDFS_STREAM_UNINITIALIZED) {
+        errno = EBADF;
+        return -1;
+    }
+
+    if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) {
+        return preadFullyDirect(fs, f, position, buffer, length);
+    }
+
+    env = getJNIEnv();
+    if (env == NULL) {
+        errno = EINTERNAL;
+        return -1;
+    }
+
+    //Error checking... make sure that this file is 'readable'
+    if (f->type != HDFS_STREAM_INPUT) {
+        fprintf(stderr, "Cannot read from a non-InputStream object!\n");
+        errno = EINVAL;
+        return -1;
+    }
+
+    // JAVA EQUIVALENT:
+    //  byte [] bR = new byte[length];
+    //  fis.read(pos, bR, 0, length);
+    jbRarray = (*env)->NewByteArray(env, length);
+    if (!jbRarray) {
+        errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+                                             "hdfsPread: NewByteArray");
+        return -1;
+    }
+
+    jthr = invokeMethod(env, NULL, INSTANCE, f->file,
+                        JC_FS_DATA_INPUT_STREAM, "readFully", "(J[BII)V",
+                        position, jbRarray, 0, length);
+    if (jthr) {
+        destroyLocalReference(env, jbRarray);
+        errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                                      "hdfsPread: FSDataInputStream#read");
+        return -1;
+    }
+
+    (*env)->GetByteArrayRegion(env, jbRarray, 0, length, buffer);
+    destroyLocalReference(env, jbRarray);
+    if ((*env)->ExceptionCheck(env)) {
+        errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+                "hdfsPread: GetByteArrayRegion");
+        return -1;
+    }
+    return 0;
+}
+
+int preadFullyDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer,
+                  tSize length)
+{
+    // JAVA EQUIVALENT:
+    //  ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer
+    //  fis.read(position, buf);
+
+    jthrowable jthr;
+    jobject bb;
+
+    //Get the JNIEnv* corresponding to current thread
+    JNIEnv* env = getJNIEnv();
+    if (env == NULL) {
+        errno = EINTERNAL;
+        return -1;
+    }
+
+    //Error checking... make sure that this file is 'readable'
+    if (f->type != HDFS_STREAM_INPUT) {
+        fprintf(stderr, "Cannot read from a non-InputStream object!\n");
+        errno = EINVAL;
+        return -1;
+    }
+
+    //Read the requisite bytes
+    bb = (*env)->NewDirectByteBuffer(env, buffer, length);
+    if (bb == NULL) {
+        errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+                "readDirect: NewDirectByteBuffer");
+        return -1;
+    }
+
+    jthr = invokeMethod(env, NULL, INSTANCE, f->file,
+            JC_FS_DATA_INPUT_STREAM, "readFully",
+            "(JLjava/nio/ByteBuffer;)V", position, bb);
+    destroyLocalReference(env, bb);
+    if (jthr) {
+        errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "preadDirect: FSDataInputStream#read");
+        return -1;
+    }
+    return 0;
+}
+
 tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)
 tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)
 {
 {
     // JAVA EQUIVALENT
     // JAVA EQUIVALENT

+ 20 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h

@@ -600,7 +600,8 @@ extern  "C" {
     tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
     tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
 
 
     /** 
     /** 
-     * hdfsPread - Positional read of data from an open file.
+     * hdfsPread - Positional read of data from an open file. Reads up to the
+     * number of specified bytes in length.
      * @param fs The configured filesystem handle.
      * @param fs The configured filesystem handle.
      * @param file The file handle.
      * @param file The file handle.
      * @param position Position from which to read
      * @param position Position from which to read
@@ -612,6 +613,24 @@ extern  "C" {
     tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position,
     tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position,
                     void* buffer, tSize length);
                     void* buffer, tSize length);
 
 
+    /**
+     * hdfsPreadFully - Positional read of data from an open file. Reads the
+     * number of specified bytes in length, or until the end of the data is
+     * reached. Unlike hdfsRead and hdfsPread, this method does not return
+     * the number of bytes read because either (1) the entire length of the
+     * buffer is filled, or (2) the end of the file is reached. If the eof is
+     * reached, an exception is thrown and errno is set to EINTR.
+     * @param fs The configured filesystem handle.
+     * @param file The file handle.
+     * @param position Position from which to read
+     * @param buffer The buffer to copy read bytes into.
+     * @param length The length of the buffer.
+     * @return Returns 0 on success, -1 on error.
+     */
+    LIBHDFS_EXTERNAL
+    int hdfsPreadFully(hdfsFS fs, hdfsFile file, tOffset position,
+                    void* buffer, tSize length);
+
 
 
     /** 
     /** 
      * hdfsWrite - Write data into an open file.
      * hdfsWrite - Write data into an open file.

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c

@@ -317,6 +317,12 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position,
   return ret;
   return ret;
 }
 }
 
 
+int hdfsPreadFully(hdfsFS fs, hdfsFile file, tOffset position,
+                void* buffer, tSize length) {
+  return libhdfs_hdfsPreadFully(fs->libhdfsRep, file->libhdfsRep, position,
+          buffer, length);
+}
+
 tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer,
 tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer,
                 tSize length) {
                 tSize length) {
   return libhdfs_hdfsWrite(fs->libhdfsRep, file->libhdfsRep, buffer, length);
   return libhdfs_hdfsWrite(fs->libhdfsRep, file->libhdfsRep, buffer, length);

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_defines.h

@@ -47,6 +47,7 @@
 #define hdfsTell libhdfs_hdfsTell
 #define hdfsTell libhdfs_hdfsTell
 #define hdfsRead libhdfs_hdfsRead
 #define hdfsRead libhdfs_hdfsRead
 #define hdfsPread libhdfs_hdfsPread
 #define hdfsPread libhdfs_hdfsPread
+#define hdfsPreadFully libhdfs_hdfsPreadFully
 #define hdfsWrite libhdfs_hdfsWrite
 #define hdfsWrite libhdfs_hdfsWrite
 #define hdfsFlush libhdfs_hdfsFlush
 #define hdfsFlush libhdfs_hdfsFlush
 #define hdfsHFlush libhdfs_hdfsHFlush
 #define hdfsHFlush libhdfs_hdfsHFlush

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h

@@ -47,6 +47,7 @@
 #undef hdfsTell
 #undef hdfsTell
 #undef hdfsRead
 #undef hdfsRead
 #undef hdfsPread
 #undef hdfsPread
+#undef hdfsPreadFully
 #undef hdfsWrite
 #undef hdfsWrite
 #undef hdfsFlush
 #undef hdfsFlush
 #undef hdfsHFlush
 #undef hdfsHFlush

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h

@@ -47,6 +47,7 @@
 #define hdfsTell libhdfspp_hdfsTell
 #define hdfsTell libhdfspp_hdfsTell
 #define hdfsRead libhdfspp_hdfsRead
 #define hdfsRead libhdfspp_hdfsRead
 #define hdfsPread libhdfspp_hdfsPread
 #define hdfsPread libhdfspp_hdfsPread
+#define hdfsPreadFully libhdfspp_hdfsPreadFully
 #define hdfsWrite libhdfspp_hdfsWrite
 #define hdfsWrite libhdfspp_hdfsWrite
 #define hdfsFlush libhdfspp_hdfsFlush
 #define hdfsFlush libhdfspp_hdfsFlush
 #define hdfsHFlush libhdfspp_hdfsHFlush
 #define hdfsHFlush libhdfspp_hdfsHFlush

+ 24 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java

@@ -85,6 +85,7 @@ public class TestByteBufferPread {
     testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
     testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
     testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
     testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
     testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
     testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
+    testPreadFullyWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
   }
   }
 
 
   /**
   /**
@@ -97,6 +98,7 @@ public class TestByteBufferPread {
     testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
     testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
     testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
     testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
     testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
     testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+    testPreadFullyWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
   }
   }
 
 
   /**
   /**
@@ -122,7 +124,6 @@ public class TestByteBufferPread {
       byte[] bufferContents = new byte[FILE_SIZE];
       byte[] bufferContents = new byte[FILE_SIZE];
       buffer.get(bufferContents);
       buffer.get(bufferContents);
       assertArrayEquals(bufferContents, fileContents);
       assertArrayEquals(bufferContents, fileContents);
-      buffer.position(buffer.limit());
     }
     }
   }
   }
 
 
@@ -157,7 +158,7 @@ public class TestByteBufferPread {
 
 
   /**
   /**
    * Reads half of the testFile into the {@link ByteBuffer} by setting a
    * Reads half of the testFile into the {@link ByteBuffer} by setting a
-   * {@link ByteBuffer#limit} on the buffer. Validates that only half of the
+   * {@link ByteBuffer#limit()} on the buffer. Validates that only half of the
    * testFile is loaded into the buffer.
    * testFile is loaded into the buffer.
    */
    */
   private void testPreadWithLimitedByteBuffer(
   private void testPreadWithLimitedByteBuffer(
@@ -191,7 +192,7 @@ public class TestByteBufferPread {
 
 
   /**
   /**
    * Reads half of the testFile into the {@link ByteBuffer} by setting the
    * Reads half of the testFile into the {@link ByteBuffer} by setting the
-   * {@link ByteBuffer#position} the half the size of the file. Validates that
+   * {@link ByteBuffer#position()} the half the size of the file. Validates that
    * only half of the testFile is loaded into the buffer.
    * only half of the testFile is loaded into the buffer.
    */
    */
   private void testPreadWithPositionedByteBuffer(
   private void testPreadWithPositionedByteBuffer(
@@ -257,6 +258,26 @@ public class TestByteBufferPread {
     }
     }
   }
   }
 
 
+  /**
+   * Reads the entire testFile using the preadFully API and validates that its
+   * contents are properly loaded into the supplied {@link ByteBuffer}.
+   */
+  private void testPreadFullyWithByteBuffer(ByteBuffer buffer)
+          throws IOException {
+    int totalBytesRead = 0;
+    try (FSDataInputStream in = fs.open(testFile)) {
+      in.readFully(totalBytesRead, buffer);
+      // Make sure the buffer is full
+      assertFalse(buffer.hasRemaining());
+      // Make sure the contents of the read buffer equal the contents of the
+      // file
+      buffer.position(0);
+      byte[] bufferContents = new byte[FILE_SIZE];
+      buffer.get(bufferContents);
+      assertArrayEquals(bufferContents, fileContents);
+    }
+  }
+
   @AfterClass
   @AfterClass
   public static void shutdown() throws IOException {
   public static void shutdown() throws IOException {
     try {
     try {