Browse Source

HADOOP-14535 wasb: implement high-performance random access and seek of block blobs.
Contributed by Thomas Marquardt

(cherry picked from commit d670c3a4da7dd80dccf6c6308603bb3bb013b3b0)

Steve Loughran 8 years ago
parent
commit
41e83b2ca2

+ 8 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java

@@ -1459,6 +1459,14 @@ public class ContractTestUtils extends Assert {
       return now() - startTime;
     }
 
+    /**
+     * Elapsed time in milliseconds; no rounding.
+     * @return elapsed time
+     */
+    public long elapsedTimeMs() {
+      return elapsedTime() / 1000000;
+    }
+
     public double bandwidth(long bytes) {
       return bandwidthMBs(bytes, duration());
     }

+ 35 - 43
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java

@@ -19,7 +19,6 @@
 package org.apache.hadoop.fs.azure;
 import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER;
 
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -121,6 +120,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private static final String KEY_STREAM_MIN_READ_SIZE = "fs.azure.read.request.size";
   private static final String KEY_STORAGE_CONNECTION_TIMEOUT = "fs.azure.storage.timeout";
   private static final String KEY_WRITE_BLOCK_SIZE = "fs.azure.write.request.size";
+  @VisibleForTesting
+  static final String KEY_INPUT_STREAM_VERSION = "fs.azure.input.stream.version.for.internal.use.only";
 
   // Property controlling whether to allow reads on blob which are concurrently
   // appended out-of-band.
@@ -222,6 +223,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   public static final int DEFAULT_DOWNLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
   public static final int DEFAULT_UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
 
+  private static final int DEFAULT_INPUT_STREAM_VERSION = 2;
+
   // Retry parameter defaults.
   //
 
@@ -280,6 +283,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
   private int downloadBlockSizeBytes = DEFAULT_DOWNLOAD_BLOCK_SIZE;
   private int uploadBlockSizeBytes = DEFAULT_UPLOAD_BLOCK_SIZE;
+  private int inputStreamVersion = DEFAULT_INPUT_STREAM_VERSION;
 
   // Bandwidth throttling exponential back-off parameters
   //
@@ -691,6 +695,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     this.uploadBlockSizeBytes = sessionConfiguration.getInt(
         KEY_WRITE_BLOCK_SIZE, DEFAULT_UPLOAD_BLOCK_SIZE);
 
+    this.inputStreamVersion = sessionConfiguration.getInt(
+        KEY_INPUT_STREAM_VERSION, DEFAULT_INPUT_STREAM_VERSION);
+
     // The job may want to specify a timeout to use when engaging the
     // storage service. The default is currently 90 seconds. It may
     // be necessary to increase this value for long latencies in larger
@@ -1417,8 +1424,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   private InputStream openInputStream(CloudBlobWrapper blob)
       throws StorageException, IOException {
     if (blob instanceof CloudBlockBlobWrapper) {
-      return blob.openInputStream(getDownloadOptions(),
-          getInstrumentedContext(isConcurrentOOBAppendAllowed()));
+      LOG.debug("Using stream seek algorithm {}", inputStreamVersion);
+      switch(inputStreamVersion) {
+      case 1:
+        return blob.openInputStream(getDownloadOptions(),
+            getInstrumentedContext(isConcurrentOOBAppendAllowed()));
+      case 2:
+        return new BlockBlobInputStream((CloudBlockBlobWrapper) blob,
+            getDownloadOptions(),
+            getInstrumentedContext(isConcurrentOOBAppendAllowed()));
+      default:
+        throw new IOException("Unknown seek algorithm: " + inputStreamVersion);
+      }
     } else {
       return new PageBlobInputStream(
           (CloudPageBlobWrapper) blob, getInstrumentedContext(
@@ -2023,32 +2040,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   }
 
   @Override
-  public DataInputStream retrieve(String key) throws AzureException, IOException {
-      try {
-        // Check if a session exists, if not create a session with the
-        // Azure storage server.
-        if (null == storageInteractionLayer) {
-          final String errMsg = String.format(
-              "Storage session expected for URI '%s' but does not exist.",
-              sessionUri);
-          throw new AssertionError(errMsg);
-        }
-        checkContainer(ContainerAccessType.PureRead);
-
-        // Get blob reference and open the input buffer stream.
-        CloudBlobWrapper blob = getBlobReference(key);
-
-        // Return a data input stream.
-        DataInputStream inDataStream = new DataInputStream(openInputStream(blob));
-        return inDataStream;
-    } catch (Exception e) {
-      // Re-throw as an Azure storage exception.
-      throw new AzureException(e);
-    }
+  public InputStream retrieve(String key) throws AzureException, IOException {
+    return retrieve(key, 0);
   }
 
   @Override
-  public DataInputStream retrieve(String key, long startByteOffset)
+  public InputStream retrieve(String key, long startByteOffset)
       throws AzureException, IOException {
       try {
         // Check if a session exists, if not create a session with the
@@ -2061,24 +2058,19 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         }
         checkContainer(ContainerAccessType.PureRead);
 
-        // Get blob reference and open the input buffer stream.
-        CloudBlobWrapper blob = getBlobReference(key);
-
-        // Open input stream and seek to the start offset.
-        InputStream in = blob.openInputStream(
-          getDownloadOptions(), getInstrumentedContext(isConcurrentOOBAppendAllowed()));
-
-        // Create a data input stream.
-	    DataInputStream inDataStream = new DataInputStream(in);
-	    
-	    // Skip bytes and ignore return value. This is okay
-	    // because if you try to skip too far you will be positioned
-	    // at the end and reads will not return data.
-	    inDataStream.skip(startByteOffset);
-        return inDataStream;
+        InputStream inputStream = openInputStream(getBlobReference(key));
+        if (startByteOffset > 0) {
+          // Skip bytes and ignore return value. This is okay
+          // because if you try to skip too far you will be positioned
+          // at the end and reads will not return data.
+          inputStream.skip(startByteOffset);
+        }
+        return inputStream;
+    } catch (IOException e) {
+        throw e;
     } catch (Exception e) {
-      // Re-throw as an Azure storage exception.
-      throw new AzureException(e);
+        // Re-throw as an Azure storage exception.
+        throw new AzureException(e);
     }
   }
 

+ 396 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobInputStream.java

@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
+
+/**
+ * Encapsulates the BlobInputStream used by block blobs and adds support for
+ * random access and seek. Random access performance is improved by several
+ * orders of magnitude.
+ */
+final class BlockBlobInputStream extends InputStream implements Seekable {
+  private final CloudBlockBlobWrapper blob;
+  private final BlobRequestOptions options;
+  private final OperationContext opContext;
+  private InputStream blobInputStream = null;
+  private int minimumReadSizeInBytes = 0;
+  private long streamPositionAfterLastRead = -1;
+  private long streamPosition = 0;
+  private long streamLength = 0;
+  private boolean closed = false;
+  private byte[] streamBuffer;
+  private int streamBufferPosition;
+  private int streamBufferLength;
+
+  /**
+   * Creates a seek-able stream for reading from block blobs.
+   * @param blob a block blob reference.
+   * @param options the blob request options.
+   * @param opContext the blob operation context.
+   * @throws IOException IO failure
+   */
+  BlockBlobInputStream(CloudBlockBlobWrapper blob,
+      BlobRequestOptions options,
+      OperationContext opContext) throws IOException {
+    this.blob = blob;
+    this.options = options;
+    this.opContext = opContext;
+
+    this.minimumReadSizeInBytes = blob.getStreamMinimumReadSizeInBytes();
+
+    try {
+      this.blobInputStream = blob.openInputStream(options, opContext);
+    } catch (StorageException e) {
+      throw new IOException(e);
+    }
+
+    this.streamLength = blob.getProperties().getLength();
+  }
+
+  private void checkState() throws IOException {
+    if (closed) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+  }
+
+  /**
+   * Gets the read position of the stream.
+   * @return the zero-based byte offset of the read position.
+   * @throws IOException IO failure
+   */
+  @Override
+  public synchronized long getPos() throws IOException {
+    checkState();
+    return streamPosition;
+  }
+
+  /**
+   * Sets the read position of the stream.
+   * @param pos a zero-based byte offset in the stream.
+   * @throws EOFException if read is out of range
+   */
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    checkState();
+    if (pos < 0) {
+      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
+    }
+    if (pos > streamLength) {
+      throw new EOFException(
+          FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
+    }
+    if (pos == getPos()) {
+      // no=op, no state change
+      return;
+    }
+
+    if (streamBuffer != null) {
+      long offset = streamPosition - pos;
+      if (offset > 0 && offset < streamBufferLength) {
+        streamBufferPosition = streamBufferLength - (int) offset;
+      } else {
+        streamBufferPosition = streamBufferLength;
+      }
+    }
+
+    streamPosition = pos;
+    // close BlobInputStream after seek is invoked because BlobInputStream
+    // does not support seek
+    closeBlobInputStream();
+  }
+
+  /**
+   * Seeks an secondary copy of the data.  This method is not supported.
+   * @param targetPos a zero-based byte offset in the stream.
+   * @return false
+   * @throws IOException IO failure
+   */
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+
+  /**
+   * Gets the number of bytes that can be read (or skipped over) without
+   * performing a network operation.
+   * @throws IOException IO failure
+   */
+  @Override
+  public synchronized int available() throws IOException {
+    checkState();
+    if (blobInputStream != null) {
+      return blobInputStream.available();
+    } else {
+      return (streamBuffer == null)
+          ? 0
+          : streamBufferLength - streamBufferPosition;
+    }
+  }
+
+  private void closeBlobInputStream() throws IOException {
+    if (blobInputStream != null) {
+      try {
+        blobInputStream.close();
+      } finally {
+        blobInputStream = null;
+      }
+    }
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated with it.
+   * @throws IOException IO failure
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    closed = true;
+    closeBlobInputStream();
+    streamBuffer = null;
+    streamBufferPosition = 0;
+    streamBufferLength = 0;
+  }
+
+  private int doNetworkRead(byte[] buffer, int offset, int len)
+      throws IOException {
+    MemoryOutputStream outputStream;
+    boolean needToCopy = false;
+
+    if (streamPositionAfterLastRead == streamPosition) {
+      // caller is reading sequentially, so initialize the stream buffer
+      if (streamBuffer == null) {
+        streamBuffer = new byte[(int) Math.min(minimumReadSizeInBytes,
+            streamLength)];
+      }
+      streamBufferPosition = 0;
+      streamBufferLength = 0;
+      outputStream = new MemoryOutputStream(streamBuffer, streamBufferPosition,
+          streamBuffer.length);
+      needToCopy = true;
+    } else {
+      outputStream = new MemoryOutputStream(buffer, offset, len);
+    }
+
+    long bytesToRead = Math.min(
+        minimumReadSizeInBytes,
+        Math.min(
+            outputStream.capacity(),
+            streamLength - streamPosition));
+
+    try {
+      blob.downloadRange(streamPosition, bytesToRead, outputStream, options,
+          opContext);
+    } catch (StorageException e) {
+      throw new IOException(e);
+    }
+
+    int bytesRead = outputStream.size();
+    if (bytesRead > 0) {
+      streamPosition += bytesRead;
+      streamPositionAfterLastRead = streamPosition;
+      int count = Math.min(bytesRead, len);
+      if (needToCopy) {
+        streamBufferLength = bytesRead;
+        System.arraycopy(streamBuffer, streamBufferPosition, buffer, offset,
+            count);
+        streamBufferPosition += count;
+      }
+      return count;
+    } else {
+      // This may happen if the blob was modified after the length was obtained.
+      throw new EOFException("End of stream reached unexpectedly.");
+    }
+  }
+
+  /**
+   * Reads up to <code>len</code> bytes of data from the input stream into an
+   * array of bytes.
+   * @param b a buffer into which the data is written.
+   * @param offset a start offset into {@code buffer} where the data is written.
+   * @param len the maximum number of bytes to be read.
+   * @return the number of bytes written into {@code buffer}, or -1.
+   * @throws IOException IO failure
+   */
+  @Override
+  public synchronized int read(byte[] b, int offset, int len)
+      throws IOException {
+    checkState();
+    NativeAzureFileSystemHelper.validateReadArgs(b, offset, len);
+    if (blobInputStream != null) {
+      int numberOfBytesRead = blobInputStream.read(b, offset, len);
+      streamPosition += numberOfBytesRead;
+      return numberOfBytesRead;
+    } else {
+      if (offset < 0 || len < 0 || len > b.length - offset) {
+        throw new IndexOutOfBoundsException("read arguments out of range");
+      }
+      if (len == 0) {
+        return 0;
+      }
+
+      int bytesRead = 0;
+      int available = available();
+      if (available > 0) {
+        bytesRead = Math.min(available, len);
+        System.arraycopy(streamBuffer, streamBufferPosition, b, offset,
+            bytesRead);
+        streamBufferPosition += bytesRead;
+      }
+
+      if (len == bytesRead) {
+        return len;
+      }
+      if (streamPosition >= streamLength) {
+        return (bytesRead > 0) ? bytesRead : -1;
+      }
+
+      offset += bytesRead;
+      len -= bytesRead;
+
+      return bytesRead + doNetworkRead(b, offset, len);
+    }
+  }
+
+  /**
+   * Reads the next byte of data from the stream.
+   * @return the next byte of data, or -1
+   * @throws IOException IO failure
+   */
+  @Override
+  public int read() throws IOException {
+    byte[] buffer = new byte[1];
+    int numberOfBytesRead = read(buffer, 0, 1);
+    return (numberOfBytesRead < 1) ? -1 : buffer[0];
+  }
+
+  /**
+   * Skips over and discards n bytes of data from this input stream.
+   * @param n the number of bytes to be skipped.
+   * @return the actual number of bytes skipped.
+   * @throws IOException IO failure
+   */
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    checkState();
+
+    if (blobInputStream != null) {
+      return blobInputStream.skip(n);
+    } else {
+      if (n < 0 || streamPosition + n > streamLength) {
+        throw new IndexOutOfBoundsException("skip range");
+      }
+
+      if (streamBuffer != null) {
+        streamBufferPosition = (n < streamBufferLength - streamBufferPosition)
+            ? streamBufferPosition + (int) n
+            : streamBufferLength;
+      }
+
+      streamPosition += n;
+      return n;
+    }
+  }
+
+  /**
+   * An <code>OutputStream</code> backed by a user-supplied buffer.
+   */
+  static class MemoryOutputStream extends OutputStream {
+    private final byte[] buffer;
+    private final int offset;
+    private final int length;
+    private int writePosition;
+
+    /**
+     * Creates a <code>MemoryOutputStream</code> from a user-supplied buffer.
+     * @param buffer an array of bytes.
+     * @param offset a starting offset in <code>buffer</code> where the data
+     * will be written.
+     * @param length the maximum number of bytes to be written to the stream.
+     */
+    MemoryOutputStream(byte[] buffer, int offset, int length) {
+      if (buffer == null) {
+        throw new NullPointerException("buffer");
+      }
+      if (offset < 0 || length < 0 || length > buffer.length - offset) {
+        throw new IndexOutOfBoundsException("offset out of range of buffer");
+      }
+      this.buffer = buffer;
+      this.offset = offset;
+      this.length = length;
+      this.writePosition = offset;
+    }
+
+    /**
+     * Gets the current size of the stream.
+     */
+    public synchronized int size() {
+      return writePosition - offset;
+    }
+
+    /**
+     * Gets the current capacity of the stream.
+     */
+    public synchronized int capacity() {
+      return length - offset;
+    }
+
+    /**
+     * Writes the next byte to the stream.
+     * @param b the byte to be written.
+     * @throws IOException IO failure
+     */
+    public synchronized void write(int b) throws IOException {
+      if (size() > length - 1) {
+        throw new IOException("No space for more writes");
+      }
+      buffer[writePosition++] = (byte) b;
+    }
+
+    /**
+     * Writes a range of bytes to the stream.
+     * @param b a byte array.
+     * @param off the start offset in <code>buffer</code> from which the data
+     * is read.
+     * @param length the number of bytes to be written.
+     * @throws IOException IO failure
+     */
+    public synchronized void write(byte[] b, int off, int length)
+        throws IOException {
+      if (b == null) {
+        throw new NullPointerException("Null buffer argument");
+      }
+      if (off < 0 || length < 0 || length > b.length - off) {
+        throw new IndexOutOfBoundsException("array write offset");
+      }
+      System.arraycopy(b, off, buffer, writePosition, length);
+      writePosition += length;
+    }
+  }
+}

+ 11 - 27
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.fs.azure;
 
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.FileNotFoundException;
@@ -55,6 +54,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
 import org.apache.hadoop.fs.azure.security.Constants;
@@ -741,7 +741,7 @@ public class NativeAzureFileSystem extends FileSystem {
     // File length, valid only for streams over block blobs.
     private long fileLength;
 
-    public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) {
+    NativeAzureFsInputStream(InputStream in, String key, long fileLength) {
       this.in = in;
       this.key = key;
       this.isPageBlob = store.isPageBlobKey(key);
@@ -815,27 +815,6 @@ public class NativeAzureFileSystem extends FileSystem {
       }
     }
 
-    @Override
-    public synchronized  void readFully(long position, byte[] buffer, int offset, int length)
-        throws IOException {
-      validatePositionedReadArgs(position, buffer, offset, length);
-
-      int nread = 0;
-      while (nread < length) {
-        // In case BlobInputStream is used, mark() can act as a hint to read ahead only this
-        // length instead of 4 MB boundary.
-        in.mark(length - nread);
-        int nbytes = read(position + nread,
-            buffer,
-            offset + nread,
-            length - nread);
-        if (nbytes < 0) {
-          throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
-        }
-        nread += nbytes;
-      }
-    }
-
     /*
      * Reads up to len bytes of data from the input stream into an array of
      * bytes. An attempt is made to read as many as len bytes, but a smaller
@@ -907,9 +886,14 @@ public class NativeAzureFileSystem extends FileSystem {
           throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
         }
         if (this.pos > pos) {
-          IOUtils.closeStream(in);
-          in = store.retrieve(key);
-          this.pos = in.skip(pos);
+          if (in instanceof Seekable) {
+            ((Seekable) in).seek(pos);
+            this.pos = pos;
+          } else {
+            IOUtils.closeStream(in);
+            in = store.retrieve(key);
+            this.pos = in.skip(pos);
+          }
         } else {
           this.pos += in.skip(pos - this.pos);
         }
@@ -2546,7 +2530,7 @@ public class NativeAzureFileSystem extends FileSystem {
           + " is a directory not a file.");
     }
 
-    DataInputStream inputStream = null;
+    InputStream inputStream;
     try {
       inputStream = store.retrieve(key);
     } catch(Exception ex) {

+ 28 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java

@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.fs.azure;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,6 +31,8 @@ import com.microsoft.azure.storage.StorageErrorCodeStrings;
 import com.microsoft.azure.storage.StorageException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FSExceptionMessages;
+
 /**
  * Utility class that has helper methods.
  *
@@ -104,4 +108,28 @@ final class NativeAzureFileSystemHelper {
       }
     }
   }
+
+  /**
+   * Validation code, based on
+   * {@code FSInputStream.validatePositionedReadArgs()}.
+   * @param buffer destination buffer
+   * @param offset offset within the buffer
+   * @param length length of bytes to read
+   * @throws EOFException if the position is negative
+   * @throws IndexOutOfBoundsException if there isn't space for the amount of
+   * data requested.
+   * @throws IllegalArgumentException other arguments are invalid.
+   */
+  static void validateReadArgs(byte[] buffer, int offset, int length)
+      throws EOFException {
+    Preconditions.checkArgument(length >= 0, "length is negative");
+    Preconditions.checkArgument(buffer != null, "Null buffer");
+    if (buffer.length - offset < length) {
+      throw new IndexOutOfBoundsException(
+          FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+              + ": request length=" + length
+              + ", with offset =" + offset
+              + "; buffer capacity =" + (buffer.length - offset));
+    }
+  }
 }

+ 3 - 3
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java

@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.fs.azure;
 
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.util.Date;
 
@@ -46,9 +46,9 @@ interface NativeFileSystemStore {
 
   FileMetadata retrieveMetadata(String key) throws IOException;
 
-  DataInputStream retrieve(String key) throws IOException;
+  InputStream retrieve(String key) throws IOException;
 
-  DataInputStream retrieve(String key, long byteRangeStart) throws IOException;
+  InputStream retrieve(String key, long byteRangeStart) throws IOException;
 
   DataOutputStream storefile(String key, PermissionStatus permissionStatus)
       throws AzureException;

+ 5 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java

@@ -465,6 +465,11 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
       getBlob().uploadProperties(getLeaseCondition(lease), null, opContext);
     }
 
+    @Override
+    public int getStreamMinimumReadSizeInBytes() {
+        return getBlob().getStreamMinimumReadSizeInBytes();
+    }
+
     @Override
     public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
       getBlob().setStreamMinimumReadSizeInBytes(minimumReadSizeBytes);

+ 9 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java

@@ -582,10 +582,17 @@ abstract class StorageInterface {
         throws StorageException;
 
     SelfRenewingLease acquireLease() throws StorageException;
-    
+
+    /**
+     * Gets the minimum read block size to use with this Blob.
+     *
+     * @return The minimum block size, in bytes, for reading from a block blob.
+     */
+    int getStreamMinimumReadSizeInBytes();
+
     /**
      * Sets the minimum read block size to use with this Blob.
-     * 
+     *
      * @param minimumReadSizeBytes
      *          The maximum block size, in bytes, for reading from a block blob
      *          while using a {@link BlobInputStream} object, ranging from 512

+ 5 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java

@@ -398,6 +398,11 @@ class StorageInterfaceImpl extends StorageInterface {
       getBlob().uploadProperties(getLeaseCondition(lease), null, opContext);
     }
 
+    @Override
+    public int getStreamMinimumReadSizeInBytes() {
+        return getBlob().getStreamMinimumReadSizeInBytes();
+    }
+
     @Override
     public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
       getBlob().setStreamMinimumReadSizeInBytes(minimumReadSizeBytes);

+ 33 - 7
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java

@@ -82,13 +82,22 @@ public final class AzureBlobStorageTestAccount {
   private static final ConcurrentLinkedQueue<MetricsRecord> allMetrics =
       new ConcurrentLinkedQueue<MetricsRecord>();
   private static boolean metricsConfigSaved = false;
+  private boolean skipContainerDelete = false;
 
   private AzureBlobStorageTestAccount(NativeAzureFileSystem fs,
       CloudStorageAccount account,
       CloudBlobContainer container) {
+    this(fs, account, container, false);
+  }
+
+  private AzureBlobStorageTestAccount(NativeAzureFileSystem fs,
+      CloudStorageAccount account,
+      CloudBlobContainer container,
+      boolean skipContainerDelete) {
     this.account = account;
     this.container = container;
     this.fs = fs;
+    this.skipContainerDelete = skipContainerDelete;
   }
 
   /**
@@ -524,8 +533,19 @@ public final class AzureBlobStorageTestAccount {
     return create(containerNameSuffix, createOptions, null);
   }
 
-  public static AzureBlobStorageTestAccount create(String containerNameSuffix,
-      EnumSet<CreateOptions> createOptions, Configuration initialConfiguration)
+  public static AzureBlobStorageTestAccount create(
+      String containerNameSuffix,
+      EnumSet<CreateOptions> createOptions,
+      Configuration initialConfiguration)
+      throws Exception {
+    return create(containerNameSuffix, createOptions, initialConfiguration, false);
+  }
+
+  public static AzureBlobStorageTestAccount create(
+      String containerNameSuffix,
+      EnumSet<CreateOptions> createOptions,
+      Configuration initialConfiguration,
+      boolean useContainerSuffixAsContainerName)
       throws Exception {
     saveMetricsConfigFile();
     NativeAzureFileSystem fs = null;
@@ -538,12 +558,17 @@ public final class AzureBlobStorageTestAccount {
       return null;
     }
     fs = new NativeAzureFileSystem();
-    String containerName = String.format("wasbtests-%s-%tQ%s",
-        System.getProperty("user.name"), new Date(), containerNameSuffix);
+    String containerName = useContainerSuffixAsContainerName
+        ? containerNameSuffix
+        : String.format(
+            "wasbtests-%s-%tQ%s",
+            System.getProperty("user.name"),
+            new Date(),
+            containerNameSuffix);
     container = account.createCloudBlobClient().getContainerReference(
         containerName);
     if (createOptions.contains(CreateOptions.CreateContainer)) {
-      container.create();
+      container.createIfNotExists();
     }
     String accountName = conf.get(TEST_ACCOUNT_NAME_PROPERTY_NAME);
     if (createOptions.contains(CreateOptions.UseSas)) {
@@ -578,7 +603,8 @@ public final class AzureBlobStorageTestAccount {
     // Create test account initializing the appropriate member variables.
     //
     AzureBlobStorageTestAccount testAcct =
-        new AzureBlobStorageTestAccount(fs, account, container);
+        new AzureBlobStorageTestAccount(fs, account, container,
+            useContainerSuffixAsContainerName);
 
     return testAcct;
   }
@@ -824,7 +850,7 @@ public final class AzureBlobStorageTestAccount {
       fs.close();
       fs = null;
     }
-    if (container != null) {
+    if (!skipContainerDelete && container != null) {
       container.deleteIfExists();
       container = null;
     }

+ 35 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java

@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.reflect.Method;
+import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -474,12 +475,30 @@ public class MockStorageInterface extends StorageInterface {
     public void downloadRange(long offset, long length, OutputStream os,
         BlobRequestOptions options, OperationContext opContext)
         throws StorageException {
-      throw new NotImplementedException();
+      if (offset < 0 || length <= 0) {
+        throw new IndexOutOfBoundsException();
+      }
+      if (!backingStore.exists(convertUriToDecodedString(uri))) {
+        throw new StorageException("BlobNotFound",
+            "Resource does not exist.",
+            HttpURLConnection.HTTP_NOT_FOUND,
+            null,
+            null);
+      }
+      byte[] content = backingStore.getContent(convertUriToDecodedString(uri));
+      try {
+        os.write(content, (int) offset, (int) length);
+      } catch (IOException e) {
+        throw new StorageException("Unknown error", "Unexpected error", e);
+      }
     }
   }
 
   class MockCloudBlockBlobWrapper extends MockCloudBlobWrapper
     implements CloudBlockBlobWrapper {
+
+    int minimumReadSize = AzureNativeFileSystemStore.DEFAULT_DOWNLOAD_BLOCK_SIZE;
+
     public MockCloudBlockBlobWrapper(URI uri, HashMap<String, String> metadata,
         int length) {
       super(uri, metadata, length);
@@ -492,8 +511,14 @@ public class MockStorageInterface extends StorageInterface {
           metadata);
     }
 
+    @Override
+    public int getStreamMinimumReadSizeInBytes() {
+      return this.minimumReadSize;
+    }
+
     @Override
     public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
+        this.minimumReadSize = minimumReadSizeBytes;
     }
 
     @Override
@@ -546,6 +571,9 @@ public class MockStorageInterface extends StorageInterface {
 
   class MockCloudPageBlobWrapper extends MockCloudBlobWrapper
     implements CloudPageBlobWrapper {
+
+    int minimumReadSize = AzureNativeFileSystemStore.DEFAULT_DOWNLOAD_BLOCK_SIZE;
+
     public MockCloudPageBlobWrapper(URI uri, HashMap<String, String> metadata,
         int length) {
       super(uri, metadata, length);
@@ -570,8 +598,14 @@ public class MockStorageInterface extends StorageInterface {
       throw new NotImplementedException();
     }
 
+    @Override
+    public int getStreamMinimumReadSizeInBytes() {
+      return this.minimumReadSize;
+    }
+
     @Override
     public void setStreamMinimumReadSizeInBytes(int minimumReadSize) {
+        this.minimumReadSize = minimumReadSize;
     }
 
     @Override

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java

@@ -155,7 +155,7 @@ public class TestAzureConcurrentOutOfBandIo {
         "WASB_String.txt");
    writeBlockTask.startWriting();
    int count = 0;
-   DataInputStream inputStream = null;
+   InputStream inputStream = null;
 
    for (int i = 0; i < 5; i++) {
      try {

+ 756 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlockBlobInputStream.java

@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import org.junit.FixMethodOrder;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
+
+import static org.junit.Assert.*;
+import static org.junit.Assume.*;
+
+import static org.apache.hadoop.test.LambdaTestUtils.*;
+
+/**
+ * Test semantics and performance of the original block blob input stream
+ * (KEY_INPUT_STREAM_VERSION=1) and the new
+ * <code>BlockBlobInputStream</code> (KEY_INPUT_STREAM_VERSION=2).
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+
+public class TestBlockBlobInputStream extends AbstractWasbTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestBlockBlobInputStream.class);
+  private static final int KILOBYTE = 1024;
+  private static final int MEGABYTE = KILOBYTE * KILOBYTE;
+  private static final int TEST_FILE_SIZE = 6 * MEGABYTE;
+  private static final Path TEST_FILE_PATH = new Path(
+      "TestBlockBlobInputStream.txt");
+
+  private AzureBlobStorageTestAccount accountUsingInputStreamV1;
+  private AzureBlobStorageTestAccount accountUsingInputStreamV2;
+  private long testFileLength;
+
+  /**
+   * Long test timeout.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(10 * 60 * 1000);
+  private FileStatus testFileStatus;
+  private Path hugefile;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = new Configuration();
+    conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1);
+
+    accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create(
+        "testblockblobinputstream",
+        EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
+        conf,
+        true);
+
+    accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create(
+        "testblockblobinputstream",
+        EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class),
+        null,
+        true);
+
+    assumeNotNull(accountUsingInputStreamV1);
+    assumeNotNull(accountUsingInputStreamV2);
+    hugefile = fs.makeQualified(TEST_FILE_PATH);
+    try {
+      testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
+      testFileLength = testFileStatus.getLen();
+    } catch (FileNotFoundException e) {
+      // file doesn't exist
+      testFileLength = 0;
+    }
+  }
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1);
+
+    accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create(
+        "testblockblobinputstream",
+        EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
+        conf,
+        true);
+
+    accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create(
+        "testblockblobinputstream",
+        EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class),
+        null,
+        true);
+
+    assumeNotNull(accountUsingInputStreamV1);
+    assumeNotNull(accountUsingInputStreamV2);
+    return accountUsingInputStreamV1;
+  }
+
+  /**
+   * Create a test file by repeating the characters in the alphabet.
+   * @throws IOException
+   */
+  private void createTestFileAndSetLength() throws IOException {
+    FileSystem fs = accountUsingInputStreamV1.getFileSystem();
+
+    // To reduce test run time, the test file can be reused.
+    if (fs.exists(TEST_FILE_PATH)) {
+      testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
+      testFileLength = testFileStatus.getLen();
+      LOG.info("Reusing test file: {}", testFileStatus);
+      return;
+    }
+
+    int sizeOfAlphabet = ('z' - 'a' + 1);
+    byte[] buffer = new byte[26 * KILOBYTE];
+    char character = 'a';
+    for (int i = 0; i < buffer.length; i++) {
+      buffer[i] = (byte) character;
+      character = (character == 'z') ? 'a' : (char) ((int) character + 1);
+    }
+
+    LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH,
+        TEST_FILE_SIZE );
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+
+    try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
+      int bytesWritten = 0;
+      while (bytesWritten < TEST_FILE_SIZE) {
+        outputStream.write(buffer);
+        bytesWritten += buffer.length;
+      }
+      LOG.info("Closing stream {}", outputStream);
+      ContractTestUtils.NanoTimer closeTimer
+          = new ContractTestUtils.NanoTimer();
+      outputStream.close();
+      closeTimer.end("time to close() output stream");
+    }
+    timer.end("time to write %d KB", TEST_FILE_SIZE / 1024);
+    testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen();
+  }
+
+  void assumeHugeFileExists() throws IOException {
+    ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
+    FileStatus status = fs.getFileStatus(hugefile);
+    ContractTestUtils.assertIsFile(hugefile, status);
+    assertTrue("File " + hugefile + " is empty", status.getLen() > 0);
+  }
+
+  /**
+   * Calculate megabits per second from the specified values for bytes and
+   * milliseconds.
+   * @param bytes The number of bytes.
+   * @param milliseconds The number of milliseconds.
+   * @return The number of megabits per second.
+   */
+  private static double toMbps(long bytes, long milliseconds) {
+    return bytes / 1000.0 * 8 / milliseconds;
+  }
+
+  @Test
+  public void test_0100_CreateHugeFile() throws IOException {
+    createTestFileAndSetLength();
+  }
+
+  /**
+   * Validates the implementation of InputStream.markSupported.
+   * @throws IOException
+   */
+  @Test
+  public void test_0301_MarkSupportedV1() throws IOException {
+    validateMarkSupported(accountUsingInputStreamV1.getFileSystem());
+  }
+
+  /**
+   * Validates the implementation of InputStream.markSupported.
+   * @throws IOException
+   */
+  @Test
+  public void test_0302_MarkSupportedV2() throws IOException {
+    validateMarkSupported(accountUsingInputStreamV1.getFileSystem());
+  }
+
+  private void validateMarkSupported(FileSystem fs) throws IOException {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      assertTrue("mark is not supported", inputStream.markSupported());
+    }
+  }
+
+  /**
+   * Validates the implementation of InputStream.mark and reset
+   * for version 1 of the block blob input stream.
+   * @throws Exception
+   */
+  @Test
+  public void test_0303_MarkAndResetV1() throws Exception {
+    validateMarkAndReset(accountUsingInputStreamV1.getFileSystem());
+  }
+
+  /**
+   * Validates the implementation of InputStream.mark and reset
+   * for version 2 of the block blob input stream.
+   * @throws Exception
+   */
+  @Test
+  public void test_0304_MarkAndResetV2() throws Exception {
+    validateMarkAndReset(accountUsingInputStreamV2.getFileSystem());
+  }
+
+  private void validateMarkAndReset(FileSystem fs) throws Exception {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      inputStream.mark(KILOBYTE - 1);
+
+      byte[] buffer = new byte[KILOBYTE];
+      int bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+
+      inputStream.reset();
+      assertEquals("rest -> pos 0", 0, inputStream.getPos());
+
+      inputStream.mark(8 * KILOBYTE - 1);
+
+      buffer = new byte[8 * KILOBYTE];
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+
+      intercept(IOException.class,
+          "Resetting to invalid mark",
+          new Callable<FSDataInputStream>() {
+            @Override
+            public FSDataInputStream call() throws Exception {
+              inputStream.reset();
+              return inputStream;
+            }
+          }
+      );
+    }
+  }
+
+  /**
+   * Validates the implementation of Seekable.seekToNewSource, which should
+   * return false for version 1 of the block blob input stream.
+   * @throws IOException
+   */
+  @Test
+  public void test_0305_SeekToNewSourceV1() throws IOException {
+    validateSeekToNewSource(accountUsingInputStreamV1.getFileSystem());
+  }
+
+  /**
+   * Validates the implementation of Seekable.seekToNewSource, which should
+   * return false for version 2 of the block blob input stream.
+   * @throws IOException
+   */
+  @Test
+  public void test_0306_SeekToNewSourceV2() throws IOException {
+    validateSeekToNewSource(accountUsingInputStreamV2.getFileSystem());
+  }
+
+  private void validateSeekToNewSource(FileSystem fs) throws IOException {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      assertFalse(inputStream.seekToNewSource(0));
+    }
+  }
+
+  /**
+   * Validates the implementation of InputStream.skip and ensures there is no
+   * network I/O for version 1 of the block blob input stream.
+   * @throws Exception
+   */
+  @Test
+  public void test_0307_SkipBoundsV1() throws Exception {
+    validateSkipBounds(accountUsingInputStreamV1.getFileSystem());
+  }
+
+  /**
+   * Validates the implementation of InputStream.skip and ensures there is no
+   * network I/O for version 2 of the block blob input stream.
+   * @throws Exception
+   */
+  @Test
+  public void test_0308_SkipBoundsV2() throws Exception {
+    validateSkipBounds(accountUsingInputStreamV2.getFileSystem());
+  }
+
+  private void validateSkipBounds(FileSystem fs) throws Exception {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      NanoTimer timer = new NanoTimer();
+
+      long skipped = inputStream.skip(-1);
+      assertEquals(0, skipped);
+
+      skipped = inputStream.skip(0);
+      assertEquals(0, skipped);
+
+      assertTrue(testFileLength > 0);
+
+      skipped = inputStream.skip(testFileLength);
+      assertEquals(testFileLength, skipped);
+
+      intercept(EOFException.class,
+          new Callable<Long>() {
+            @Override
+            public Long call() throws Exception {
+              return inputStream.skip(1);
+            }
+          }
+      );
+      long elapsedTimeMs = timer.elapsedTimeMs();
+      assertTrue(
+          String.format(
+              "There should not be any network I/O (elapsedTimeMs=%1$d).",
+              elapsedTimeMs),
+          elapsedTimeMs < 20);
+    }
+  }
+
+  /**
+   * Validates the implementation of Seekable.seek and ensures there is no
+   * network I/O for forward seek.
+   * @throws Exception
+   */
+  @Test
+  public void test_0309_SeekBoundsV1() throws Exception {
+    validateSeekBounds(accountUsingInputStreamV1.getFileSystem());
+  }
+
+  /**
+   * Validates the implementation of Seekable.seek and ensures there is no
+   * network I/O for forward seek.
+   * @throws Exception
+   */
+  @Test
+  public void test_0310_SeekBoundsV2() throws Exception {
+    validateSeekBounds(accountUsingInputStreamV2.getFileSystem());
+  }
+
+  private void validateSeekBounds(FileSystem fs) throws Exception {
+    assumeHugeFileExists();
+    try (
+        FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
+    ) {
+      NanoTimer timer = new NanoTimer();
+
+      inputStream.seek(0);
+      assertEquals(0, inputStream.getPos());
+
+      intercept(EOFException.class,
+          FSExceptionMessages.NEGATIVE_SEEK,
+          new Callable<FSDataInputStream>() {
+            @Override
+            public FSDataInputStream call() throws Exception {
+              inputStream.seek(-1);
+              return inputStream;
+            }
+          }
+      );
+
+      assertTrue("Test file length only " + testFileLength, testFileLength > 0);
+      inputStream.seek(testFileLength);
+      assertEquals(testFileLength, inputStream.getPos());
+
+      intercept(EOFException.class,
+          FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
+          new Callable<FSDataInputStream>() {
+            @Override
+            public FSDataInputStream call() throws Exception {
+              inputStream.seek(testFileLength + 1);
+              return inputStream;
+            }
+          }
+      );
+
+      long elapsedTimeMs = timer.elapsedTimeMs();
+      assertTrue(
+          String.format(
+              "There should not be any network I/O (elapsedTimeMs=%1$d).",
+              elapsedTimeMs),
+          elapsedTimeMs < 20);
+    }
+  }
+
+  /**
+   * Validates the implementation of Seekable.seek, Seekable.getPos,
+   * and InputStream.available.
+   * @throws Exception
+   */
+  @Test
+  public void test_0311_SeekAndAvailableAndPositionV1() throws Exception {
+    validateSeekAndAvailableAndPosition(
+        accountUsingInputStreamV1.getFileSystem());
+  }
+
+  /**
+   * Validates the implementation of Seekable.seek, Seekable.getPos,
+   * and InputStream.available.
+   * @throws Exception
+   */
+  @Test
+  public void test_0312_SeekAndAvailableAndPositionV2() throws Exception {
+    validateSeekAndAvailableAndPosition(
+        accountUsingInputStreamV2.getFileSystem());
+  }
+
+  private void validateSeekAndAvailableAndPosition(FileSystem fs)
+      throws Exception {
+    assumeHugeFileExists();
+    try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
+      byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
+      byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
+      byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'};
+      byte[] buffer = new byte[3];
+
+      int bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected1, buffer);
+      assertEquals(buffer.length, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected2, buffer);
+      assertEquals(2 * buffer.length, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      // reverse seek
+      int seekPos = 0;
+      inputStream.seek(seekPos);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected1, buffer);
+      assertEquals(buffer.length + seekPos, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      // reverse seek
+      seekPos = 1;
+      inputStream.seek(seekPos);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected3, buffer);
+      assertEquals(buffer.length + seekPos, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      // forward seek
+      seekPos = 6;
+      inputStream.seek(seekPos);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected4, buffer);
+      assertEquals(buffer.length + seekPos, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+    }
+  }
+
+  /**
+   * Validates the implementation of InputStream.skip, Seekable.getPos,
+   * and InputStream.available.
+   * @throws IOException
+   */
+  @Test
+  public void test_0313_SkipAndAvailableAndPositionV1() throws IOException {
+    validateSkipAndAvailableAndPosition(
+        accountUsingInputStreamV1.getFileSystem());
+  }
+
+  /**
+   * Validates the implementation of InputStream.skip, Seekable.getPos,
+   * and InputStream.available.
+   * @throws IOException
+   */
+  @Test
+  public void test_0314_SkipAndAvailableAndPositionV2() throws IOException {
+    validateSkipAndAvailableAndPosition(
+        accountUsingInputStreamV1.getFileSystem());
+  }
+
+  private void validateSkipAndAvailableAndPosition(FileSystem fs)
+      throws IOException {
+    assumeHugeFileExists();
+    try (
+        FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
+    ) {
+      byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
+      byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
+      byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
+      byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'};
+
+      assertEquals(testFileLength, inputStream.available());
+      assertEquals(0, inputStream.getPos());
+
+      int n = 3;
+      long skipped = inputStream.skip(n);
+
+      assertEquals(skipped, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+      assertEquals(skipped, n);
+
+      byte[] buffer = new byte[3];
+      int bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected2, buffer);
+      assertEquals(buffer.length + skipped, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      // does skip still work after seek?
+      int seekPos = 1;
+      inputStream.seek(seekPos);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected3, buffer);
+      assertEquals(buffer.length + seekPos, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+
+      long currentPosition = inputStream.getPos();
+      n = 2;
+      skipped = inputStream.skip(n);
+
+      assertEquals(currentPosition + skipped, inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+      assertEquals(skipped, n);
+
+      bytesRead = inputStream.read(buffer);
+      assertEquals(buffer.length, bytesRead);
+      assertArrayEquals(expected4, buffer);
+      assertEquals(buffer.length + skipped + currentPosition,
+          inputStream.getPos());
+      assertEquals(testFileLength - inputStream.getPos(),
+          inputStream.available());
+    }
+  }
+
+  /**
+   * Ensures parity in the performance of sequential read for
+   * version 1 and version 2 of the block blob input stream.
+   * @throws IOException
+   */
+  @Test
+  public void test_0315_SequentialReadPerformance() throws IOException {
+    assumeHugeFileExists();
+    final int maxAttempts = 10;
+    final double maxAcceptableRatio = 1.01;
+    double v1ElapsedMs = 0, v2ElapsedMs = 0;
+    double ratio = Double.MAX_VALUE;
+    for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
+      v1ElapsedMs = sequentialRead(1,
+          accountUsingInputStreamV1.getFileSystem(), false);
+      v2ElapsedMs = sequentialRead(2,
+          accountUsingInputStreamV2.getFileSystem(), false);
+      ratio = v2ElapsedMs / v1ElapsedMs;
+      LOG.info(String.format(
+          "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f",
+          (long) v1ElapsedMs,
+          (long) v2ElapsedMs,
+          ratio));
+    }
+    assertTrue(String.format(
+        "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d,"
+            + " v2ElapsedMs=%2$d, ratio=%3$.2f",
+        (long) v1ElapsedMs,
+        (long) v2ElapsedMs,
+        ratio),
+        ratio < maxAcceptableRatio);
+  }
+
+  /**
+   * Ensures parity in the performance of sequential read after reverse seek for
+   * version 2 of the block blob input stream.
+   * @throws IOException
+   */
+  @Test
+  public void test_0316_SequentialReadAfterReverseSeekPerformanceV2()
+      throws IOException {
+    assumeHugeFileExists();
+    final int maxAttempts = 10;
+    final double maxAcceptableRatio = 1.01;
+    double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0;
+    double ratio = Double.MAX_VALUE;
+    for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
+      beforeSeekElapsedMs = sequentialRead(2,
+          accountUsingInputStreamV2.getFileSystem(), false);
+      afterSeekElapsedMs = sequentialRead(2,
+          accountUsingInputStreamV2.getFileSystem(), true);
+      ratio = afterSeekElapsedMs / beforeSeekElapsedMs;
+      LOG.info(String.format(
+          "beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d, ratio=%3$.2f",
+          (long) beforeSeekElapsedMs,
+          (long) afterSeekElapsedMs,
+          ratio));
+    }
+    assertTrue(String.format(
+        "Performance of version 2 after reverse seek is not acceptable:"
+            + " beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d,"
+            + " ratio=%3$.2f",
+        (long) beforeSeekElapsedMs,
+        (long) afterSeekElapsedMs,
+        ratio),
+        ratio < maxAcceptableRatio);
+  }
+
+  private long sequentialRead(int version,
+      FileSystem fs,
+      boolean afterReverseSeek) throws IOException {
+    byte[] buffer = new byte[16 * KILOBYTE];
+    long totalBytesRead = 0;
+    long bytesRead = 0;
+
+    try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      if (afterReverseSeek) {
+        while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) {
+          bytesRead = inputStream.read(buffer);
+          totalBytesRead += bytesRead;
+        }
+        totalBytesRead = 0;
+        inputStream.seek(0);
+      }
+
+      NanoTimer timer = new NanoTimer();
+      while ((bytesRead = inputStream.read(buffer)) > 0) {
+        totalBytesRead += bytesRead;
+      }
+      long elapsedTimeMs = timer.elapsedTimeMs();
+
+      LOG.info(String.format(
+          "v%1$d: bytesRead=%2$d, elapsedMs=%3$d, Mbps=%4$.2f,"
+              + " afterReverseSeek=%5$s",
+          version,
+          totalBytesRead,
+          elapsedTimeMs,
+          toMbps(totalBytesRead, elapsedTimeMs),
+          afterReverseSeek));
+
+      assertEquals(testFileLength, totalBytesRead);
+      inputStream.close();
+      return elapsedTimeMs;
+    }
+  }
+
+  @Test
+  public void test_0317_RandomReadPerformance() throws IOException {
+    assumeHugeFileExists();
+    final int maxAttempts = 10;
+    final double maxAcceptableRatio = 0.10;
+    double v1ElapsedMs = 0, v2ElapsedMs = 0;
+    double ratio = Double.MAX_VALUE;
+    for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
+      v1ElapsedMs = randomRead(1,
+          accountUsingInputStreamV1.getFileSystem());
+      v2ElapsedMs = randomRead(2,
+          accountUsingInputStreamV2.getFileSystem());
+      ratio = v2ElapsedMs / v1ElapsedMs;
+      LOG.info(String.format(
+          "v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f",
+          (long) v1ElapsedMs,
+          (long) v2ElapsedMs,
+          ratio));
+    }
+    assertTrue(String.format(
+        "Performance of version 2 is not acceptable: v1ElapsedMs=%1$d,"
+            + " v2ElapsedMs=%2$d, ratio=%3$.2f",
+        (long) v1ElapsedMs,
+        (long) v2ElapsedMs,
+        ratio),
+        ratio < maxAcceptableRatio);
+  }
+
+  private long randomRead(int version, FileSystem fs) throws IOException {
+    assumeHugeFileExists();
+    final int minBytesToRead = 2 * MEGABYTE;
+    Random random = new Random();
+    byte[] buffer = new byte[8 * KILOBYTE];
+    long totalBytesRead = 0;
+    long bytesRead = 0;
+    try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
+      NanoTimer timer = new NanoTimer();
+
+      do {
+        bytesRead = inputStream.read(buffer);
+        totalBytesRead += bytesRead;
+        inputStream.seek(random.nextInt(
+            (int) (testFileLength - buffer.length)));
+      } while (bytesRead > 0 && totalBytesRead < minBytesToRead);
+
+      long elapsedTimeMs = timer.elapsedTimeMs();
+
+      inputStream.close();
+
+      LOG.info(String.format(
+          "v%1$d: totalBytesRead=%2$d, elapsedTimeMs=%3$d, Mbps=%4$.2f",
+          version,
+          totalBytesRead,
+          elapsedTimeMs,
+          toMbps(totalBytesRead, elapsedTimeMs)));
+
+      assertTrue(minBytesToRead <= totalBytesRead);
+
+      return elapsedTimeMs;
+    }
+  }
+
+  @Test
+  public void test_999_DeleteHugeFiles() throws IOException {
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    fs.delete(TEST_FILE_PATH, false);
+    timer.end("time to delete %s", TEST_FILE_PATH);
+  }
+
+}