浏览代码

HADOOP-12073. Azure FileSystem PageBlobInputStream does not return -1 on EOF. Contributed by Ivan Mitic.

(cherry picked from commit c45784bc9031353b938f4756473937cca759b3dc)
cnauroth 10 年之前
父节点
当前提交
f5b0cce7fa

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -356,6 +356,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12054. RPC client should not retry for InvalidToken exceptions.
     HADOOP-12054. RPC client should not retry for InvalidToken exceptions.
     (Varun Saxena via Arpit Agarwal)
     (Varun Saxena via Arpit Agarwal)
 
 
+    HADOOP-12073. Azure FileSystem PageBlobInputStream does not return -1 on
+    EOF. (Ivan Mitic via cnauroth)
+
 Release 2.7.1 - UNRELEASED
 Release 2.7.1 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 1 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java

@@ -2301,7 +2301,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     throws AzureException {
     throws AzureException {
     if (blob instanceof CloudPageBlobWrapper) {
     if (blob instanceof CloudPageBlobWrapper) {
       try {
       try {
-        return PageBlobInputStream.getPageBlobSize((CloudPageBlobWrapper) blob,
+        return PageBlobInputStream.getPageBlobDataSize((CloudPageBlobWrapper) blob,
             getInstrumentedContext(
             getInstrumentedContext(
                 isConcurrentOOBAppendAllowed()));
                 isConcurrentOOBAppendAllowed()));
       } catch (Exception e) {
       } catch (Exception e) {

+ 25 - 7
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java

@@ -80,7 +80,7 @@ final class PageBlobInputStream extends InputStream {
    * @throws IOException If the format is corrupt.
    * @throws IOException If the format is corrupt.
    * @throws StorageException If anything goes wrong in the requests.
    * @throws StorageException If anything goes wrong in the requests.
    */
    */
-  public static long getPageBlobSize(CloudPageBlobWrapper blob,
+  public static long getPageBlobDataSize(CloudPageBlobWrapper blob,
       OperationContext opContext) throws IOException, StorageException {
       OperationContext opContext) throws IOException, StorageException {
     // Get the page ranges for the blob. There should be one range starting
     // Get the page ranges for the blob. There should be one range starting
     // at byte 0, but we tolerate (and ignore) ranges after the first one.
     // at byte 0, but we tolerate (and ignore) ranges after the first one.
@@ -156,7 +156,7 @@ final class PageBlobInputStream extends InputStream {
     }
     }
     if (pageBlobSize == -1) {
     if (pageBlobSize == -1) {
       try {
       try {
-        pageBlobSize = getPageBlobSize(blob, opContext);
+        pageBlobSize = getPageBlobDataSize(blob, opContext);
       } catch (StorageException e) {
       } catch (StorageException e) {
         throw new IOException("Unable to get page blob size.", e);
         throw new IOException("Unable to get page blob size.", e);
       }
       }
@@ -179,7 +179,13 @@ final class PageBlobInputStream extends InputStream {
 
 
   /**
   /**
    * Check our buffer and download more from the server if needed.
    * Check our buffer and download more from the server if needed.
-   * @return true if there's more data in the buffer, false if we're done.
+   * If data is not available in the buffer, method downloads maximum
+   * page blob download size (4MB) or if there is less then 4MB left,
+   * all remaining pages.
+   * If we are on the last page, method will return true even if
+   * we reached the end of stream.
+   * @return true if there's more data in the buffer, false if buffer is empty
+   *         and we reached the end of the blob.
    * @throws IOException
    * @throws IOException
    */
    */
   private synchronized boolean ensureDataInBuffer() throws IOException {
   private synchronized boolean ensureDataInBuffer() throws IOException {
@@ -257,11 +263,15 @@ final class PageBlobInputStream extends InputStream {
   @Override
   @Override
   public synchronized int read(byte[] outputBuffer, int offset, int len)
   public synchronized int read(byte[] outputBuffer, int offset, int len)
       throws IOException {
       throws IOException {
+    // If len is zero return 0 per the InputStream contract
+    if (len == 0) {
+      return 0;
+    }
+
     int numberOfBytesRead = 0;
     int numberOfBytesRead = 0;
     while (len > 0) {
     while (len > 0) {
       if (!ensureDataInBuffer()) {
       if (!ensureDataInBuffer()) {
-        filePosition += numberOfBytesRead;
-        return numberOfBytesRead;
+        break;
       }
       }
       int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage();
       int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage();
       int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage);
       int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage);
@@ -277,6 +287,13 @@ final class PageBlobInputStream extends InputStream {
         currentOffsetInBuffer += numBytesToRead;
         currentOffsetInBuffer += numBytesToRead;
       }
       }
     }
     }
+
+    // if outputBuffer len is > 0 and zero bytes were read, we reached
+    // an EOF
+    if (numberOfBytesRead == 0) {
+      return -1;
+    }
+
     filePosition += numberOfBytesRead;
     filePosition += numberOfBytesRead;
     return numberOfBytesRead;
     return numberOfBytesRead;
   }
   }
@@ -284,8 +301,9 @@ final class PageBlobInputStream extends InputStream {
   @Override
   @Override
   public int read() throws IOException {
   public int read() throws IOException {
     byte[] oneByte = new byte[1];
     byte[] oneByte = new byte[1];
-    if (read(oneByte) == 0) {
-      return -1;
+    int result = read(oneByte);
+    if (result < 0) {
+      return result;
     }
     }
     return oneByte[0];
     return oneByte[0];
   }
   }

+ 8 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobOutputStream.java

@@ -117,6 +117,8 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
   // The last task given to the ioThreadPool to execute, to allow
   // The last task given to the ioThreadPool to execute, to allow
   // waiting until it's done.
   // waiting until it's done.
   private WriteRequest lastQueuedTask;
   private WriteRequest lastQueuedTask;
+  // Whether the stream has been closed.
+  private boolean closed = false;
 
 
   public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class);
   public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class);
 
 
@@ -201,7 +203,11 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
    * service.
    * service.
    */
    */
   @Override
   @Override
-  public void close() throws IOException {
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+
     LOG.debug("Closing page blob output stream.");
     LOG.debug("Closing page blob output stream.");
     flush();
     flush();
     checkStreamState();
     checkStreamState();
@@ -221,7 +227,7 @@ final class PageBlobOutputStream extends OutputStream implements Syncable {
       Thread.currentThread().interrupt();
       Thread.currentThread().interrupt();
     }
     }
 
 
-    this.lastError = new IOException("Stream is already closed.");
+    closed = true;
   }
   }
 
 
   // Log the stacks of all threads.
   // Log the stacks of all threads.

+ 77 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java

@@ -41,7 +41,6 @@ import java.util.TimeZone;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -54,7 +53,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
-
 import org.apache.hadoop.fs.azure.AzureException;
 import org.apache.hadoop.fs.azure.AzureException;
 import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
 import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
 
 
@@ -472,6 +470,83 @@ public abstract class NativeAzureFileSystemBaseTest {
     }
     }
   }
   }
 
 
+  @Test
+  public void testInputStreamReadWithZeroSizeBuffer() throws Exception {
+    Path newFile = new Path("zeroSizeRead");
+    OutputStream output = fs.create(newFile);
+    output.write(10);
+    output.close();
+
+    InputStream input = fs.open(newFile);
+    int result = input.read(new byte[2], 0, 0);
+    assertEquals(0, result);
+  }
+
+  @Test
+  public void testInputStreamReadWithBufferReturnsMinusOneOnEof() throws Exception {
+    Path newFile = new Path("eofRead");
+    OutputStream output = fs.create(newFile);
+    output.write(10);
+    output.close();
+
+    // Read first byte back
+    InputStream input = fs.open(newFile);
+    byte[] buff = new byte[1];
+    int result = input.read(buff, 0, 1);
+    assertEquals(1, result);
+    assertEquals(10, buff[0]);
+
+    // Issue another read and make sure it returns -1
+    buff[0] = 2;
+    result = input.read(buff, 0, 1);
+    assertEquals(-1, result);
+    // Buffer is intact
+    assertEquals(2, buff[0]);
+  }
+
+  @Test
+  public void testInputStreamReadWithBufferReturnsMinusOneOnEofForLargeBuffer() throws Exception {
+    Path newFile = new Path("eofRead2");
+    OutputStream output = fs.create(newFile);
+    byte[] outputBuff = new byte[97331];
+    for(int i = 0; i < outputBuff.length; ++i) {
+      outputBuff[i] = (byte)(Math.random() * 255);
+    }
+    output.write(outputBuff);
+    output.close();
+
+    // Read the content of the file
+    InputStream input = fs.open(newFile);
+    byte[] buff = new byte[131072];
+    int result = input.read(buff, 0, buff.length);
+    assertEquals(outputBuff.length, result);
+    for(int i = 0; i < outputBuff.length; ++i) {
+      assertEquals(outputBuff[i], buff[i]);
+    }
+
+    // Issue another read and make sure it returns -1
+    buff = new byte[131072];
+    result = input.read(buff, 0, buff.length);
+    assertEquals(-1, result);
+  }
+
+  @Test
+  public void testInputStreamReadIntReturnsMinusOneOnEof() throws Exception {
+    Path newFile = new Path("eofRead3");
+    OutputStream output = fs.create(newFile);
+    output.write(10);
+    output.close();
+
+    // Read first byte back
+    InputStream input = fs.open(newFile);
+    int value = input.read();
+    assertEquals(10, value);
+
+    // Issue another read and make sure it returns -1
+    value = input.read();
+    assertEquals(-1, value);
+  }
+
   @Test
   @Test
   public void testSetPermissionOnFile() throws Exception {
   public void testSetPermissionOnFile() throws Exception {
     Path newFile = new Path("testPermission");
     Path newFile = new Path("testPermission");

+ 90 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemContractPageBlobLive.java

@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.junit.Ignore;
+
+public class TestNativeAzureFileSystemContractPageBlobLive extends
+    FileSystemContractBaseTest {
+  private AzureBlobStorageTestAccount testAccount;
+
+  private AzureBlobStorageTestAccount createTestAccount()
+      throws Exception {
+    Configuration conf = new Configuration();
+
+    // Configure the page blob directories key so every file created is a page blob.
+    conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+    // Configure the atomic rename directories key so every folder will have
+    // atomic rename applied.
+    conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+    return AzureBlobStorageTestAccount.create(conf);
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    testAccount = createTestAccount();
+    if (testAccount != null) {
+      fs = testAccount.getFileSystem();
+    }
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    if (testAccount != null) {
+      testAccount.cleanup();
+      testAccount = null;
+      fs = null;
+    }
+  }
+
+  @Override
+  protected void runTest() throws Throwable {
+    if (testAccount != null) {
+      super.runTest();
+    }
+  }
+  
+  /**
+   * The following tests are failing on Azure and the Azure 
+   * file system code needs to be modified to make them pass.
+   * A separate work item has been opened for this.
+   */
+  @Ignore
+  public void testMoveFileUnderParent() throws Throwable {
+  }
+
+  @Ignore
+  public void testRenameFileToSelf() throws Throwable {
+  }
+  
+  @Ignore
+  public void testRenameChildDirForbidden() throws Exception {
+  }
+  
+  @Ignore
+  public void testMoveDirUnderParent() throws Throwable {
+  }
+  
+  @Ignore
+  public void testRenameDirToSelf() throws Throwable {
+  }
+}