Просмотр исходного кода

HADOOP-891: Make page and block blob input streams return 0 for available() only if at EOF

Eric Hanson 11 лет назад
Родитель
Сommit
87ff91e0e7

+ 34 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/azurenative/NativeAzureFileSystem.java

@@ -118,10 +118,40 @@ public class NativeAzureFileSystem extends FileSystem {
     private InputStream in;
     private final String key;
     private long pos = 0;
+    private boolean closed = false;
+    private boolean isPageBlob;
 
-    public NativeAzureFsInputStream(DataInputStream in, String key) {
+    // File length, valid only for streams over block blobs.
+    private long fileLength;
+
+    public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) {
       this.in = in;
       this.key = key;
+      this.isPageBlob = store.isPageBlobKey(key);
+      this.fileLength = fileLength;
+    }
+
+    /**
+     * Return the size of the remaining available bytes
+     * if the size is less than or equal to {@link Integer#MAX_VALUE},
+     * otherwise, return {@link Integer#MAX_VALUE}.
+     *
+     * This is to match the behavior of DFSInputStream.available(),
+     * which some clients may rely on (HBase write-ahead log reading in
+     * particular).
+     */
+    @Override
+    public int available() throws IOException {
+      if (isPageBlob) {
+        return in.available();
+      } else {
+        if (closed) {
+          throw new IOException("Stream closed");
+        }
+        final long remaining = this.fileLength - pos;
+        return remaining <= Integer.MAX_VALUE ?
+            (int) remaining : Integer.MAX_VALUE;
+      }
     }
 
     /*
@@ -189,6 +219,7 @@ public class NativeAzureFileSystem extends FileSystem {
     @Override
     public void close() throws IOException {
       in.close();
+      closed = true;
     }
 
     @Override
@@ -1147,7 +1178,7 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     return new FSDataInputStream(new BufferedFSInputStream(
-        new NativeAzureFsInputStream(store.retrieve(key), key), bufferSize));
+        new NativeAzureFsInputStream(store.retrieve(key), key, meta.getLength()), bufferSize));
   }
 
   @Override
@@ -1396,7 +1427,7 @@ public class NativeAzureFileSystem extends FileSystem {
     if (isClosed) {
       return;
     }
-    
+
     // Call the base close() to close any resources there.
     super.close();
     // Close the store to close any resources there - e.g. the bandwidth

+ 41 - 17
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/azurenative/PageBlobInputStream.java

@@ -17,7 +17,6 @@ import static org.apache.hadoop.fs.azurenative.PageBlobFormatHelpers.*;
  * An input stream that reads file data from a page blob stored
  * using ASV's custom format.
  */
-// TODO: Make this seekable.
 final class PageBlobInputStream extends InputStream {
   private static final Log LOG = LogFactory.getLog(PageBlobInputStream.class);
 
@@ -37,6 +36,12 @@ final class PageBlobInputStream extends InputStream {
   // Maximum number of pages to get per any one request.
   private static final int MAX_PAGES_PER_DOWNLOAD =
       4 * 1024 * 1024 / PAGE_SIZE;
+  // Whether the stream has been closed.
+  private boolean closed = false;
+  // Total stream size, or -1 if not initialized.
+  long pageBlobSize = -1;
+  // Current position in stream of valid data.
+  long filePosition = 0;
 
   /**
    * Helper method to extract the actual data size of a page blob.
@@ -64,14 +69,14 @@ final class PageBlobInputStream extends InputStream {
       throw badStartRangeException(blob, pageRanges.get(0));
     }
     long totalRawBlobSize = pageRanges.get(0).getEndOffset() + 1;
-    
+
     // Get the last page.
     long lastPageStart = totalRawBlobSize - PAGE_SIZE;
-    ByteArrayOutputStream baos = 
+    ByteArrayOutputStream baos =
         new ByteArrayOutputStream (PageBlobFormatHelpers.PAGE_SIZE);
     blob.downloadRange(lastPageStart, PAGE_SIZE, baos,
         new BlobRequestOptions(), opContext);
-    
+
     byte[] lastPage = baos.toByteArray();
     short lastPageSize = getPageSize(blob, lastPage, 0);
     long totalNumberOfPages = totalRawBlobSize / PAGE_SIZE;
@@ -110,25 +115,35 @@ final class PageBlobInputStream extends InputStream {
     }
   }
 
-  /*
-   * Returns an estimate of the number of bytes that can be read (or skipped
-   * over) from this input stream without blocking by the next invocation of
-   * a method for this input stream. The next invocation might be the same
-   * thread or another thread. A single read or skip of this many bytes will
-   * not block, but may read or skip fewer bytes.
+  /** Return the size of the remaining available bytes
+   * if the size is less than or equal to {@link Integer#MAX_VALUE},
+   * otherwise, return {@link Integer#MAX_VALUE}.
+   *
+   * This is to match the behavior of DFSInputStream.available(),
+   * which some clients may rely on (HBase write-ahead log reading in
+   * particular).
    */
   @Override
   public synchronized int available() throws IOException {
-    if (currentBuffer == null) {
-      return 0;
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    if (pageBlobSize == -1) {
+      try {
+        pageBlobSize = GetPageBlobSize(blob, opContext);
+      } catch (StorageException e) {
+        throw new IOException("Unable to get page blob size.", e);
+      }
     }
-    // This is not very accurate because of the header bytes,
-    // but I'm supposed to return an estimate anyway.
-    return currentBuffer.length - currentOffsetInBuffer;
+
+    final long remaining = pageBlobSize - filePosition;
+    return remaining <= Integer.MAX_VALUE ?
+        (int) remaining : Integer.MAX_VALUE;
   }
 
   @Override
-  public void close() throws IOException {
+  public synchronized void close() throws IOException {
+    closed = true;
   }
 
   private boolean dataAvailableInBuffer() {
@@ -154,7 +169,7 @@ final class PageBlobInputStream extends InputStream {
     final long pagesToRead = Math.min(MAX_PAGES_PER_DOWNLOAD,
         numberOfPagesRemaining);
     final int bufferSize = (int)(pagesToRead * PAGE_SIZE);
- 
+
     // Download page to current buffer.
     try {
       // Create a byte array output stream to capture the results of the
@@ -219,6 +234,7 @@ final class PageBlobInputStream extends InputStream {
     int numberOfBytesRead = 0;
     while (len > 0) {
       if (!ensureDataInBuffer()) {
+        filePosition += numberOfBytesRead;
         return numberOfBytesRead;
       }
       int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage();
@@ -235,6 +251,7 @@ final class PageBlobInputStream extends InputStream {
         currentOffsetInBuffer += numBytesToRead;
       }
     }
+    filePosition += numberOfBytesRead;
     return numberOfBytesRead;
   }
 
@@ -254,6 +271,13 @@ final class PageBlobInputStream extends InputStream {
    */
   @Override
   public synchronized long skip(long n) throws IOException {
+    long skipped = skipImpl(n);
+    filePosition += skipped; // track the position in the stream
+    return skipped;
+  }
+
+  private long skipImpl(long n) throws IOException {
+
     if (n == 0) {
       return 0;
     }

+ 64 - 9
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/azurenative/NativeAzureFileSystemBaseTest.java

@@ -152,14 +152,14 @@ public abstract class NativeAzureFileSystemBaseTest {
     fs.delete(testFolder, true);
   }
 
-  void testDeepFileCreationBase(String testFilePath, String firstDirPath, String middleDirPath, 
-          short permissionShort, short umaskedPermissionShort) throws Exception  {    
+  void testDeepFileCreationBase(String testFilePath, String firstDirPath, String middleDirPath,
+          short permissionShort, short umaskedPermissionShort) throws Exception  {
     Path testFile = new Path(testFilePath);
     Path firstDir = new Path(firstDirPath);
-    Path middleDir = new Path(middleDirPath);    
-    FsPermission permission = FsPermission.createImmutable(permissionShort);  
-    FsPermission umaskedPermission = FsPermission.createImmutable(umaskedPermissionShort); 
-      
+    Path middleDir = new Path(middleDirPath);
+    FsPermission permission = FsPermission.createImmutable(permissionShort);
+    FsPermission umaskedPermission = FsPermission.createImmutable(umaskedPermissionShort);
+
     createEmptyFile(testFile, permission);
     FsPermission rootPerm = fs.getFileStatus(firstDir.getParent()).getPermission();
     FsPermission inheritPerm = FsPermission.createImmutable((short)(rootPerm.toShort() | 0300));
@@ -173,7 +173,7 @@ public abstract class NativeAzureFileSystemBaseTest {
     // verify that the file itself has the permissions as specified
     FileStatus fileStatus = fs.getFileStatus(testFile);
     assertFalse(fileStatus.isDirectory());
-    assertEqualsIgnoreStickyBit(umaskedPermission, fileStatus.getPermission());    
+    assertEqualsIgnoreStickyBit(umaskedPermission, fileStatus.getPermission());
     assertTrue(fs.delete(firstDir, true));
     assertFalse(fs.exists(testFile));
 
@@ -181,14 +181,14 @@ public abstract class NativeAzureFileSystemBaseTest {
     // and then check for the existence of the upper folders still. But that
     // doesn't actually work as expected right now.
   }
-  
+
   @Test
   public void testDeepFileCreation() throws Exception {
     // normal permissions in user home
     testDeepFileCreationBase("deep/file/creation/test", "deep", "deep/file/creation", (short)0644, (short)0644);
     // extra permissions in user home. umask will change the actual permissions.
     testDeepFileCreationBase("deep/file/creation/test", "deep", "deep/file/creation", (short)0777, (short)0755);
-    // normal permissions in root    
+    // normal permissions in root
     testDeepFileCreationBase("/deep/file/creation/test", "/deep", "/deep/file/creation", (short)0644, (short)0644);
     // less permissions in root
     testDeepFileCreationBase("/deep/file/creation/test", "/deep", "/deep/file/creation", (short)0700, (short)0700);
@@ -557,6 +557,61 @@ public abstract class NativeAzureFileSystemBaseTest {
     fs.close();
   }
 
+  // Test the available() method for the input stream returned by fs.open().
+  // This works for both page and block blobs.
+  int FILE_SIZE = 4 * 1024 * 1024 + 1; // Make this 1 bigger than internal
+                                       // buffer used in BlobInputStream
+                                       // to exercise that case.
+  int MAX_STRIDE = FILE_SIZE + 1;
+  Path PATH = new Path("/available.dat");
+  @Test
+  public void testAvailable() throws IOException {
+
+    // write FILE_SIZE bytes to page blob
+    FSDataOutputStream out = fs.create(PATH);
+    byte[] data = new byte[FILE_SIZE];
+    Arrays.fill(data, (byte) 5);
+    out.write(data, 0, FILE_SIZE);
+    out.close();
+
+    // Test available() for different read sizes
+    verifyAvailable(1);
+    verifyAvailable(100);
+    verifyAvailable(5000);
+    verifyAvailable(FILE_SIZE);
+    verifyAvailable(MAX_STRIDE);
+
+    fs.delete(PATH, false);
+  }
+
+  // Verify that available() for the input stream is always >= 1 unless we've
+  // consumed all the input, and then it is 0. This is to match expectations by
+  // HBase which were set based on behavior of DFSInputStream.available().
+  private void verifyAvailable(int readStride) throws IOException {
+    FSDataInputStream in = fs.open(PATH);
+    try {
+      byte[] inputBuffer = new byte[MAX_STRIDE];
+      int position = 0;
+      int bytesRead = 0;
+      while(bytesRead != FILE_SIZE) {
+        bytesRead += in.read(inputBuffer, position, readStride);
+        int available = in.available();
+        if (bytesRead < FILE_SIZE) {
+          if (available < 1) {
+            fail(String.format(
+                  "expected available > 0 but got: "
+                      + "position = %d, bytesRead = %d, in.available() = %d",
+                  position, bytesRead, available));
+          }
+        }
+      }
+      int available = in.available();
+      assertTrue(available == 0);
+    } finally {
+      in.close();
+    }
+  }
+
   private boolean testModifiedTime(Path testPath, long time) throws Exception {
   	FileStatus fileStatus = fs.getFileStatus(testPath);
   	final long errorMargin = modifiedTimeErrorMargin;

+ 4 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/azurenative/TestReadAndSeekPageBlobAfterWrite.java

@@ -21,14 +21,18 @@ package org.apache.hadoop.fs.azurenative;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeNotNull;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Random;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.BufferedFSInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;