瀏覽代碼

Revert "HADOOP-17195. OutOfMemory error while performing hdfs CopyFromLocal to ABFS (#3406)" (#3443)

This reverts commit 52c024cc3aac2571e60e69c7f8b620299aad8e27.
Steve Loughran 3 年之前
父節點
當前提交
10f3abeae7
共有 20 個文件被更改,包括 277 次插入2211 次删除
  1. 0 5
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
  2. 0 12
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
  3. 0 33
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/BlockUploadStatistics.java
  4. 0 1119
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
  5. 0 7
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  6. 0 138
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestDataBlocks.java
  7. 2 40
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
  8. 25 140
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
  9. 0 31
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
  10. 0 18
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
  11. 204 265
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
  12. 0 107
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
  13. 1 3
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java
  14. 1 24
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java
  15. 0 13
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestConstants.java
  16. 1 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
  17. 1 16
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsScaleTest.java
  18. 0 97
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java
  19. 7 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
  20. 35 141
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java

+ 0 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -464,9 +464,4 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   /** Default value for IOStatistics logging level. */
   public static final String IOSTATISTICS_LOGGING_LEVEL_DEFAULT
       = IOSTATISTICS_LOGGING_LEVEL_DEBUG;
-
-  /**
-   * default hadoop temp dir on local system: {@value}.
-   */
-  public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir";
 }

+ 0 - 12
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

@@ -358,18 +358,6 @@ public final class StreamStatisticNames {
   public static final String REMOTE_BYTES_READ
       = "remote_bytes_read";
 
-  /**
-   * Total number of Data blocks allocated by an outputStream.
-   */
-  public static final String BLOCKS_ALLOCATED
-      = "blocks_allocated";
-
-  /**
-   * Total number of Data blocks released by an outputStream.
-   */
-  public static final String BLOCKS_RELEASED
-      = "blocks_released";
-
   private StreamStatisticNames() {
   }
 

+ 0 - 33
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/BlockUploadStatistics.java

@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.store;
-
-public interface BlockUploadStatistics {
-
-  /**
-   * A block has been allocated.
-   */
-  void blockAllocated();
-
-  /**
-   * A block has been released.
-   */
-  void blockReleased();
-
-}

+ 0 - 1119
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java

@@ -1,1119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.store;
-
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.DirectBufferPool;
-
-import static java.util.Objects.requireNonNull;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
-import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
-import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Upload;
-import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
-import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
-
-/**
- * A class to provide disk, byteBuffer and byteArray option for Filesystem
- * OutputStreams.
- * <ul>
- *   <li>
- *     Disk: Uses Disk space to write the blocks. Is suited best to avoid
- *     OutOfMemory Errors in Java heap space.
- *   </li>
- *   <li>
- *     byteBuffer: Uses DirectByteBuffer to allocate memory off-heap to
- *     provide faster writing of DataBlocks with some risk of running
- *     OutOfMemory.
- *   </li>
- *   <li>
- *     byteArray: Uses byte[] to write a block of data. On heap and does have
- *     risk of running OutOfMemory fairly easily.
- *   </li>
- * </ul>
- * <p>
- * Implementation of DataBlocks taken from HADOOP-13560 to support huge file
- * uploads in S3A with different options rather than one.
- */
-public final class DataBlocks {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(DataBlocks.class);
-
-  /**
-   * Buffer blocks to disk.
-   * Capacity is limited to available disk space.
-   */
-  public static final String DATA_BLOCKS_BUFFER_DISK = "disk";
-
-  /**
-   * Use a byte buffer.
-   */
-  public static final String DATA_BLOCKS_BYTEBUFFER = "bytebuffer";
-
-  /**
-   * Use an in-memory array. Fast but will run of heap rapidly.
-   */
-  public static final String DATA_BLOCKS_BUFFER_ARRAY = "array";
-
-  private DataBlocks() {
-  }
-
-  /**
-   * Validate args to a write command. These are the same validation checks
-   * expected for any implementation of {@code OutputStream.write()}.
-   *
-   * @param b   byte array containing data.
-   * @param off offset in array where to start.
-   * @param len number of bytes to be written.
-   * @throws NullPointerException      for a null buffer
-   * @throws IndexOutOfBoundsException if indices are out of range
-   */
-  public static void validateWriteArgs(byte[] b, int off, int len)
-      throws IOException {
-    Preconditions.checkNotNull(b);
-    if ((off < 0) || (off > b.length) || (len < 0) ||
-        ((off + len) > b.length) || ((off + len) < 0)) {
-      throw new IndexOutOfBoundsException(
-          "write (b[" + b.length + "], " + off + ", " + len + ')');
-    }
-  }
-
-  /**
-   * Create a factory.
-   *
-   * @param keyToBufferDir Key to buffer directory config for a FS.
-   * @param configuration  factory configurations.
-   * @param name           factory name -the option from {@link CommonConfigurationKeys}.
-   * @return the factory, ready to be initialized.
-   * @throws IllegalArgumentException if the name is unknown.
-   */
-  public static BlockFactory createFactory(String keyToBufferDir,
-      Configuration configuration,
-      String name) {
-    LOG.debug("Creating DataFactory of type : {}", name);
-    switch (name) {
-    case DATA_BLOCKS_BUFFER_ARRAY:
-      return new ArrayBlockFactory(keyToBufferDir, configuration);
-    case DATA_BLOCKS_BUFFER_DISK:
-      return new DiskBlockFactory(keyToBufferDir, configuration);
-    case DATA_BLOCKS_BYTEBUFFER:
-      return new ByteBufferBlockFactory(keyToBufferDir, configuration);
-    default:
-      throw new IllegalArgumentException("Unsupported block buffer" +
-          " \"" + name + '"');
-    }
-  }
-
-  /**
-   * The output information for an upload.
-   * It can be one of a file, an input stream or a byteArray.
-   * {@link #toByteArray()} method to be used to convert the data into byte
-   * array to be doen in this class as well.
-   * When closed, any stream is closed. Any source file is untouched.
-   */
-  public static final class BlockUploadData implements Closeable {
-    private final File file;
-    private InputStream uploadStream;
-    private byte[] byteArray;
-    private boolean isClosed;
-
-    /**
-     * Constructor for byteArray upload data block. File and uploadStream
-     * would be null.
-     *
-     * @param byteArray byteArray used to construct BlockUploadData.
-     */
-    public BlockUploadData(byte[] byteArray) {
-      this.file = null;
-      this.uploadStream = null;
-
-      this.byteArray = requireNonNull(byteArray);
-    }
-
-    /**
-     * File constructor; input stream and byteArray will be null.
-     *
-     * @param file file to upload
-     */
-    BlockUploadData(File file) {
-      Preconditions.checkArgument(file.exists(), "No file: %s", file);
-      this.file = file;
-      this.uploadStream = null;
-      this.byteArray = null;
-    }
-
-    /**
-     * Stream constructor, file and byteArray field will be null.
-     *
-     * @param uploadStream stream to upload.
-     */
-    BlockUploadData(InputStream uploadStream) {
-      requireNonNull(uploadStream, "rawUploadStream");
-      this.uploadStream = uploadStream;
-      this.file = null;
-      this.byteArray = null;
-    }
-
-    /**
-     * Predicate: does this instance contain a file reference.
-     *
-     * @return true if there is a file.
-     */
-    boolean hasFile() {
-      return file != null;
-    }
-
-    /**
-     * Get the file, if there is one.
-     *
-     * @return the file for uploading, or null.
-     */
-    File getFile() {
-      return file;
-    }
-
-    /**
-     * Get the raw upload stream, if the object was
-     * created with one.
-     *
-     * @return the upload stream or null.
-     */
-    InputStream getUploadStream() {
-      return uploadStream;
-    }
-
-    /**
-     * Convert to a byte array.
-     * If the data is stored in a file, it will be read and returned.
-     * If the data was passed in via an input stream (which happens if the
-     * data is stored in a bytebuffer) then it will be converted to a byte
-     * array -which will then be cached for any subsequent use.
-     *
-     * @return byte[] after converting the uploadBlock.
-     * @throws IOException throw if an exception is caught while reading
-     *                     File/InputStream or closing InputStream.
-     */
-    public byte[] toByteArray() throws IOException {
-      Preconditions.checkState(!isClosed, "Block is closed");
-      if (byteArray != null) {
-        return byteArray;
-      }
-      if (file != null) {
-        // Need to save byteArray here so that we don't read File if
-        // byteArray() is called more than once on the same file.
-        byteArray = FileUtils.readFileToByteArray(file);
-        return byteArray;
-      }
-      byteArray = IOUtils.toByteArray(uploadStream);
-      IOUtils.close(uploadStream);
-      uploadStream = null;
-      return byteArray;
-    }
-
-    /**
-     * Close: closes any upload stream and byteArray provided in the
-     * constructor.
-     *
-     * @throws IOException inherited exception.
-     */
-    @Override
-    public void close() throws IOException {
-      isClosed = true;
-      cleanupWithLogger(LOG, uploadStream);
-      byteArray = null;
-      if (file != null) {
-        file.delete();
-      }
-    }
-  }
-
-  /**
-   * Base class for block factories.
-   */
-  public static abstract class BlockFactory implements Closeable {
-
-    private final String keyToBufferDir;
-    private final Configuration conf;
-
-    protected BlockFactory(String keyToBufferDir, Configuration conf) {
-      this.keyToBufferDir = keyToBufferDir;
-      this.conf = conf;
-    }
-
-    /**
-     * Create a block.
-     *
-     * @param index      index of block
-     * @param limit      limit of the block.
-     * @param statistics stats to work with
-     * @return a new block.
-     */
-    public abstract DataBlock create(long index, int limit,
-        BlockUploadStatistics statistics)
-        throws IOException;
-
-    /**
-     * Implement any close/cleanup operation.
-     * Base class is a no-op.
-     *
-     * @throws IOException Inherited exception; implementations should
-     *                     avoid raising it.
-     */
-    @Override
-    public void close() throws IOException {
-    }
-
-    /**
-     * Configuration.
-     *
-     * @return config passed to create the factory.
-     */
-    protected Configuration getConf() {
-      return conf;
-    }
-
-    /**
-     * Key to Buffer Directory config for a FS instance.
-     *
-     * @return String containing key to Buffer dir.
-     */
-    public String getKeyToBufferDir() {
-      return keyToBufferDir;
-    }
-  }
-
-  /**
-   * This represents a block being uploaded.
-   */
-  public static abstract class DataBlock implements Closeable {
-
-    enum DestState {Writing, Upload, Closed}
-
-    private volatile DestState state = Writing;
-    protected final long index;
-    private final BlockUploadStatistics statistics;
-
-    protected DataBlock(long index,
-        BlockUploadStatistics statistics) {
-      this.index = index;
-      this.statistics = statistics;
-    }
-
-    /**
-     * Atomically enter a state, verifying current state.
-     *
-     * @param current current state. null means "no check"
-     * @param next    next state
-     * @throws IllegalStateException if the current state is not as expected
-     */
-    protected synchronized final void enterState(DestState current,
-        DestState next)
-        throws IllegalStateException {
-      verifyState(current);
-      LOG.debug("{}: entering state {}", this, next);
-      state = next;
-    }
-
-    /**
-     * Verify that the block is in the declared state.
-     *
-     * @param expected expected state.
-     * @throws IllegalStateException if the DataBlock is in the wrong state
-     */
-    protected final void verifyState(DestState expected)
-        throws IllegalStateException {
-      if (expected != null && state != expected) {
-        throw new IllegalStateException("Expected stream state " + expected
-            + " -but actual state is " + state + " in " + this);
-      }
-    }
-
-    /**
-     * Current state.
-     *
-     * @return the current state.
-     */
-    final DestState getState() {
-      return state;
-    }
-
-    /**
-     * Return the current data size.
-     *
-     * @return the size of the data.
-     */
-    public abstract int dataSize();
-
-    /**
-     * Predicate to verify that the block has the capacity to write
-     * the given set of bytes.
-     *
-     * @param bytes number of bytes desired to be written.
-     * @return true if there is enough space.
-     */
-    abstract boolean hasCapacity(long bytes);
-
-    /**
-     * Predicate to check if there is data in the block.
-     *
-     * @return true if there is
-     */
-    public boolean hasData() {
-      return dataSize() > 0;
-    }
-
-    /**
-     * The remaining capacity in the block before it is full.
-     *
-     * @return the number of bytes remaining.
-     */
-    public abstract int remainingCapacity();
-
-    /**
-     * Write a series of bytes from the buffer, from the offset.
-     * Returns the number of bytes written.
-     * Only valid in the state {@code Writing}.
-     * Base class verifies the state but does no writing.
-     *
-     * @param buffer buffer.
-     * @param offset offset.
-     * @param length length of write.
-     * @return number of bytes written.
-     * @throws IOException trouble
-     */
-    public int write(byte[] buffer, int offset, int length) throws IOException {
-      verifyState(Writing);
-      Preconditions.checkArgument(buffer != null, "Null buffer");
-      Preconditions.checkArgument(length >= 0, "length is negative");
-      Preconditions.checkArgument(offset >= 0, "offset is negative");
-      Preconditions.checkArgument(
-          !(buffer.length - offset < length),
-          "buffer shorter than amount of data to write");
-      return 0;
-    }
-
-    /**
-     * Flush the output.
-     * Only valid in the state {@code Writing}.
-     * In the base class, this is a no-op
-     *
-     * @throws IOException any IO problem.
-     */
-    public void flush() throws IOException {
-      verifyState(Writing);
-    }
-
-    /**
-     * Switch to the upload state and return a stream for uploading.
-     * Base class calls {@link #enterState(DestState, DestState)} to
-     * manage the state machine.
-     *
-     * @return the stream.
-     * @throws IOException trouble
-     */
-    public BlockUploadData startUpload() throws IOException {
-      LOG.debug("Start datablock[{}] upload", index);
-      enterState(Writing, Upload);
-      return null;
-    }
-
-    /**
-     * Enter the closed state.
-     *
-     * @return true if the class was in any other state, implying that
-     * the subclass should do its close operations.
-     */
-    protected synchronized boolean enterClosedState() {
-      if (!state.equals(Closed)) {
-        enterState(null, Closed);
-        return true;
-      } else {
-        return false;
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (enterClosedState()) {
-        LOG.debug("Closed {}", this);
-        innerClose();
-      }
-    }
-
-    /**
-     * Inner close logic for subclasses to implement.
-     */
-    protected void innerClose() throws IOException {
-
-    }
-
-    /**
-     * A block has been allocated.
-     */
-    protected void blockAllocated() {
-      if (statistics != null) {
-        statistics.blockAllocated();
-      }
-    }
-
-    /**
-     * A block has been released.
-     */
-    protected void blockReleased() {
-      if (statistics != null) {
-        statistics.blockReleased();
-      }
-    }
-
-    protected BlockUploadStatistics getStatistics() {
-      return statistics;
-    }
-  }
-
-  // ====================================================================
-
-  /**
-   * Use byte arrays on the heap for storage.
-   */
-  static class ArrayBlockFactory extends BlockFactory {
-
-    ArrayBlockFactory(String keyToBufferDir, Configuration conf) {
-      super(keyToBufferDir, conf);
-    }
-
-    @Override
-    public DataBlock create(long index, int limit,
-        BlockUploadStatistics statistics)
-        throws IOException {
-      return new ByteArrayBlock(0, limit, statistics);
-    }
-
-  }
-
-  static class DataBlockByteArrayOutputStream extends ByteArrayOutputStream {
-
-    DataBlockByteArrayOutputStream(int size) {
-      super(size);
-    }
-
-    /**
-     * InputStream backed by the internal byte array.
-     *
-     * @return ByteArrayInputStream instance.
-     */
-    ByteArrayInputStream getInputStream() {
-      ByteArrayInputStream bin = new ByteArrayInputStream(this.buf, 0, count);
-      this.reset();
-      this.buf = null;
-      return bin;
-    }
-  }
-
-  /**
-   * Stream to memory via a {@code ByteArrayOutputStream}.
-   * <p>
-   * It can consume a lot of heap space
-   * proportional to the mismatch between writes to the stream and
-   * the JVM-wide upload bandwidth to a Store's endpoint.
-   * The memory consumption can be limited by tuning the filesystem settings
-   * to restrict the number of queued/active uploads.
-   */
-
-  static class ByteArrayBlock extends DataBlock {
-    private DataBlockByteArrayOutputStream buffer;
-    private final int limit;
-    // cache data size so that it is consistent after the buffer is reset.
-    private Integer dataSize;
-
-    ByteArrayBlock(long index,
-        int limit,
-        BlockUploadStatistics statistics) {
-      super(index, statistics);
-      this.limit = limit;
-      this.buffer = new DataBlockByteArrayOutputStream(limit);
-      blockAllocated();
-    }
-
-    /**
-     * Get the amount of data; if there is no buffer then the size is 0.
-     *
-     * @return the amount of data available to upload.
-     */
-    @Override
-    public int dataSize() {
-      return dataSize != null ? dataSize : buffer.size();
-    }
-
-    @Override
-    public BlockUploadData startUpload() throws IOException {
-      super.startUpload();
-      dataSize = buffer.size();
-      ByteArrayInputStream bufferData = buffer.getInputStream();
-      buffer = null;
-      return new BlockUploadData(bufferData);
-    }
-
-    @Override
-    boolean hasCapacity(long bytes) {
-      return dataSize() + bytes <= limit;
-    }
-
-    @Override
-    public int remainingCapacity() {
-      return limit - dataSize();
-    }
-
-    @Override
-    public int write(byte[] b, int offset, int len) throws IOException {
-      super.write(b, offset, len);
-      int written = Math.min(remainingCapacity(), len);
-      buffer.write(b, offset, written);
-      return written;
-    }
-
-    @Override
-    protected void innerClose() {
-      buffer = null;
-      blockReleased();
-    }
-
-    @Override
-    public String toString() {
-      return "ByteArrayBlock{"
-          + "index=" + index +
-          ", state=" + getState() +
-          ", limit=" + limit +
-          ", dataSize=" + dataSize +
-          '}';
-    }
-  }
-
-  // ====================================================================
-
-  /**
-   * Stream via Direct ByteBuffers; these are allocated off heap
-   * via {@link DirectBufferPool}.
-   */
-
-  static class ByteBufferBlockFactory extends BlockFactory {
-
-    private final DirectBufferPool bufferPool = new DirectBufferPool();
-    private final AtomicInteger buffersOutstanding = new AtomicInteger(0);
-
-    ByteBufferBlockFactory(String keyToBufferDir, Configuration conf) {
-      super(keyToBufferDir, conf);
-    }
-
-    @Override public ByteBufferBlock create(long index, int limit,
-        BlockUploadStatistics statistics)
-        throws IOException {
-      return new ByteBufferBlock(index, limit, statistics);
-    }
-
-    private ByteBuffer requestBuffer(int limit) {
-      LOG.debug("Requesting buffer of size {}", limit);
-      buffersOutstanding.incrementAndGet();
-      return bufferPool.getBuffer(limit);
-    }
-
-    private void releaseBuffer(ByteBuffer buffer) {
-      LOG.debug("Releasing buffer");
-      bufferPool.returnBuffer(buffer);
-      buffersOutstanding.decrementAndGet();
-    }
-
-    /**
-     * Get count of outstanding buffers.
-     *
-     * @return the current buffer count.
-     */
-    public int getOutstandingBufferCount() {
-      return buffersOutstanding.get();
-    }
-
-    @Override
-    public String toString() {
-      return "ByteBufferBlockFactory{"
-          + "buffersOutstanding=" + buffersOutstanding +
-          '}';
-    }
-
-    /**
-     * A DataBlock which requests a buffer from pool on creation; returns
-     * it when it is closed.
-     */
-    class ByteBufferBlock extends DataBlock {
-      private ByteBuffer blockBuffer;
-      private final int bufferSize;
-      // cache data size so that it is consistent after the buffer is reset.
-      private Integer dataSize;
-
-      /**
-       * Instantiate. This will request a ByteBuffer of the desired size.
-       *
-       * @param index      block index.
-       * @param bufferSize buffer size.
-       * @param statistics statistics to update.
-       */
-      ByteBufferBlock(long index,
-          int bufferSize,
-          BlockUploadStatistics statistics) {
-        super(index, statistics);
-        this.bufferSize = bufferSize;
-        this.blockBuffer = requestBuffer(bufferSize);
-        blockAllocated();
-      }
-
-      /**
-       * Get the amount of data; if there is no buffer then the size is 0.
-       *
-       * @return the amount of data available to upload.
-       */
-      @Override public int dataSize() {
-        return dataSize != null ? dataSize : bufferCapacityUsed();
-      }
-
-      @Override
-      public BlockUploadData startUpload() throws IOException {
-        super.startUpload();
-        dataSize = bufferCapacityUsed();
-        // set the buffer up from reading from the beginning
-        blockBuffer.limit(blockBuffer.position());
-        blockBuffer.position(0);
-        return new BlockUploadData(
-            new ByteBufferInputStream(dataSize, blockBuffer));
-      }
-
-      @Override
-      public boolean hasCapacity(long bytes) {
-        return bytes <= remainingCapacity();
-      }
-
-      @Override
-      public int remainingCapacity() {
-        return blockBuffer != null ? blockBuffer.remaining() : 0;
-      }
-
-      private int bufferCapacityUsed() {
-        return blockBuffer.capacity() - blockBuffer.remaining();
-      }
-
-      @Override
-      public int write(byte[] b, int offset, int len) throws IOException {
-        super.write(b, offset, len);
-        int written = Math.min(remainingCapacity(), len);
-        blockBuffer.put(b, offset, written);
-        return written;
-      }
-
-      /**
-       * Closing the block will release the buffer.
-       */
-      @Override
-      protected void innerClose() {
-        if (blockBuffer != null) {
-          blockReleased();
-          releaseBuffer(blockBuffer);
-          blockBuffer = null;
-        }
-      }
-
-      @Override
-      public String toString() {
-        return "ByteBufferBlock{"
-            + "index=" + index +
-            ", state=" + getState() +
-            ", dataSize=" + dataSize() +
-            ", limit=" + bufferSize +
-            ", remainingCapacity=" + remainingCapacity() +
-            '}';
-      }
-
-      /**
-       * Provide an input stream from a byte buffer; supporting
-       * {@link #mark(int)}, which is required to enable replay of failed
-       * PUT attempts.
-       */
-      class ByteBufferInputStream extends InputStream {
-
-        private final int size;
-        private ByteBuffer byteBuffer;
-
-        ByteBufferInputStream(int size,
-            ByteBuffer byteBuffer) {
-          LOG.debug("Creating ByteBufferInputStream of size {}", size);
-          this.size = size;
-          this.byteBuffer = byteBuffer;
-        }
-
-        /**
-         * After the stream is closed, set the local reference to the byte
-         * buffer to null; this guarantees that future attempts to use
-         * stream methods will fail.
-         */
-        @Override
-        public synchronized void close() {
-          LOG.debug("ByteBufferInputStream.close() for {}",
-              ByteBufferBlock.super.toString());
-          byteBuffer = null;
-        }
-
-        /**
-         * Verify that the stream is open.
-         *
-         * @throws IOException if the stream is closed
-         */
-        private void verifyOpen() throws IOException {
-          if (byteBuffer == null) {
-            throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
-          }
-        }
-
-        public synchronized int read() throws IOException {
-          if (available() > 0) {
-            return byteBuffer.get() & 0xFF;
-          } else {
-            return -1;
-          }
-        }
-
-        @Override
-        public synchronized long skip(long offset) throws IOException {
-          verifyOpen();
-          long newPos = position() + offset;
-          if (newPos < 0) {
-            throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
-          }
-          if (newPos > size) {
-            throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
-          }
-          byteBuffer.position((int) newPos);
-          return newPos;
-        }
-
-        @Override
-        public synchronized int available() {
-          Preconditions.checkState(byteBuffer != null,
-              FSExceptionMessages.STREAM_IS_CLOSED);
-          return byteBuffer.remaining();
-        }
-
-        /**
-         * Get the current buffer position.
-         *
-         * @return the buffer position
-         */
-        public synchronized int position() {
-          return byteBuffer.position();
-        }
-
-        /**
-         * Check if there is data left.
-         *
-         * @return true if there is data remaining in the buffer.
-         */
-        public synchronized boolean hasRemaining() {
-          return byteBuffer.hasRemaining();
-        }
-
-        @Override
-        public synchronized void mark(int readlimit) {
-          LOG.debug("mark at {}", position());
-          byteBuffer.mark();
-        }
-
-        @Override
-        public synchronized void reset() throws IOException {
-          LOG.debug("reset");
-          byteBuffer.reset();
-        }
-
-        @Override
-        public boolean markSupported() {
-          return true;
-        }
-
-        /**
-         * Read in data.
-         *
-         * @param b      destination buffer.
-         * @param offset offset within the buffer.
-         * @param length length of bytes to read.
-         * @throws EOFException              if the position is negative
-         * @throws IndexOutOfBoundsException if there isn't space for the
-         *                                   amount of data requested.
-         * @throws IllegalArgumentException  other arguments are invalid.
-         */
-        @SuppressWarnings("NullableProblems")
-        public synchronized int read(byte[] b, int offset, int length)
-            throws IOException {
-          Preconditions.checkArgument(length >= 0, "length is negative");
-          Preconditions.checkArgument(b != null, "Null buffer");
-          if (b.length - offset < length) {
-            throw new IndexOutOfBoundsException(
-                FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
-                    + ": request length =" + length
-                    + ", with offset =" + offset
-                    + "; buffer capacity =" + (b.length - offset));
-          }
-          verifyOpen();
-          if (!hasRemaining()) {
-            return -1;
-          }
-
-          int toRead = Math.min(length, available());
-          byteBuffer.get(b, offset, toRead);
-          return toRead;
-        }
-
-        @Override
-        public String toString() {
-          final StringBuilder sb = new StringBuilder(
-              "ByteBufferInputStream{");
-          sb.append("size=").append(size);
-          ByteBuffer buf = this.byteBuffer;
-          if (buf != null) {
-            sb.append(", available=").append(buf.remaining());
-          }
-          sb.append(", ").append(ByteBufferBlock.super.toString());
-          sb.append('}');
-          return sb.toString();
-        }
-      }
-    }
-  }
-
-  // ====================================================================
-
-  /**
-   * Buffer blocks to disk.
-   */
-  static class DiskBlockFactory extends BlockFactory {
-
-    private LocalDirAllocator directoryAllocator;
-
-    DiskBlockFactory(String keyToBufferDir, Configuration conf) {
-      super(keyToBufferDir, conf);
-      String bufferDir = conf.get(keyToBufferDir) != null
-          ? keyToBufferDir : HADOOP_TMP_DIR;
-      directoryAllocator = new LocalDirAllocator(bufferDir);
-    }
-
-    /**
-     * Create a temp file and a {@link DiskBlock} instance to manage it.
-     *
-     * @param index      block index.
-     * @param limit      limit of the block.
-     * @param statistics statistics to update.
-     * @return the new block.
-     * @throws IOException IO problems
-     */
-    @Override
-    public DataBlock create(long index,
-        int limit,
-        BlockUploadStatistics statistics)
-        throws IOException {
-      File destFile = createTmpFileForWrite(String.format("datablock-%04d-",
-          index),
-          limit, getConf());
-
-      return new DiskBlock(destFile, limit, index, statistics);
-    }
-
-    /**
-     * Demand create the directory allocator, then create a temporary file.
-     * This does not mark the file for deletion when a process exits.
-     * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}.
-     *
-     * @param pathStr prefix for the temporary file.
-     * @param size    the size of the file that is going to be written.
-     * @param conf    the Configuration object.
-     * @return a unique temporary file.
-     * @throws IOException IO problems
-     */
-    File createTmpFileForWrite(String pathStr, long size,
-        Configuration conf) throws IOException {
-      Path path = directoryAllocator.getLocalPathForWrite(pathStr,
-          size, conf);
-      File dir = new File(path.getParent().toUri().getPath());
-      String prefix = path.getName();
-      // create a temp file on this directory
-      return File.createTempFile(prefix, null, dir);
-    }
-  }
-
-  /**
-   * Stream to a file.
-   * This will stop at the limit; the caller is expected to create a new block.
-   */
-  static class DiskBlock extends DataBlock {
-
-    private int bytesWritten;
-    private final File bufferFile;
-    private final int limit;
-    private BufferedOutputStream out;
-    private final AtomicBoolean closed = new AtomicBoolean(false);
-
-    DiskBlock(File bufferFile,
-        int limit,
-        long index,
-        BlockUploadStatistics statistics)
-        throws FileNotFoundException {
-      super(index, statistics);
-      this.limit = limit;
-      this.bufferFile = bufferFile;
-      blockAllocated();
-      out = new BufferedOutputStream(new FileOutputStream(bufferFile));
-    }
-
-    @Override public int dataSize() {
-      return bytesWritten;
-    }
-
-    @Override
-    boolean hasCapacity(long bytes) {
-      return dataSize() + bytes <= limit;
-    }
-
-    @Override public int remainingCapacity() {
-      return limit - bytesWritten;
-    }
-
-    @Override
-    public int write(byte[] b, int offset, int len) throws IOException {
-      super.write(b, offset, len);
-      int written = Math.min(remainingCapacity(), len);
-      out.write(b, offset, written);
-      bytesWritten += written;
-      return written;
-    }
-
-    @Override
-    public BlockUploadData startUpload() throws IOException {
-      super.startUpload();
-      try {
-        out.flush();
-      } finally {
-        out.close();
-        out = null;
-      }
-      return new BlockUploadData(bufferFile);
-    }
-
-    /**
-     * The close operation will delete the destination file if it still
-     * exists.
-     *
-     * @throws IOException IO problems
-     */
-    @SuppressWarnings("UnnecessaryDefault")
-    @Override
-    protected void innerClose() throws IOException {
-      final DestState state = getState();
-      LOG.debug("Closing {}", this);
-      switch (state) {
-      case Writing:
-        if (bufferFile.exists()) {
-          // file was not uploaded
-          LOG.debug("Block[{}]: Deleting buffer file as upload did not start",
-              index);
-          closeBlock();
-        }
-        break;
-
-      case Upload:
-        LOG.debug("Block[{}]: Buffer file {} exists —close upload stream",
-            index, bufferFile);
-        break;
-
-      case Closed:
-        closeBlock();
-        break;
-
-      default:
-        // this state can never be reached, but checkstyle complains, so
-        // it is here.
-      }
-    }
-
-    /**
-     * Flush operation will flush to disk.
-     *
-     * @throws IOException IOE raised on FileOutputStream
-     */
-    @Override public void flush() throws IOException {
-      super.flush();
-      out.flush();
-    }
-
-    @Override
-    public String toString() {
-      String sb = "FileBlock{"
-          + "index=" + index
-          + ", destFile=" + bufferFile +
-          ", state=" + getState() +
-          ", dataSize=" + dataSize() +
-          ", limit=" + limit +
-          '}';
-      return sb;
-    }
-
-    /**
-     * Close the block.
-     * This will delete the block's buffer file if the block has
-     * not previously been closed.
-     */
-    void closeBlock() {
-      LOG.debug("block[{}]: closeBlock()", index);
-      if (!closed.getAndSet(true)) {
-        blockReleased();
-        if (!bufferFile.delete() && bufferFile.exists()) {
-          LOG.warn("delete({}) returned false",
-              bufferFile.getAbsoluteFile());
-        }
-      } else {
-        LOG.debug("block[{}]: skipping re-entrant closeBlock()", index);
-      }
-    }
-  }
-}

+ 0 - 7
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -2300,13 +2300,6 @@
   </description>
 </property>
 
-  <property>
-    <name>fs.azure.buffer.dir</name>
-    <value>${hadoop.tmp.dir}/abfs</value>
-    <description>Directory path for buffer files needed to upload data blocks
-      in AbfsOutputStream.</description>
-  </property>
-
 <property>
   <name>fs.AbstractFileSystem.gs.impl</name>
   <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>

+ 0 - 138
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/store/TestDataBlocks.java

@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.store;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.test.LambdaTestUtils;
-
-import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY;
-import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK;
-import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * UTs to test {@link DataBlocks} functionalities.
- */
-public class TestDataBlocks {
-  private final Configuration configuration = new Configuration();
-  private static final int ONE_KB = 1024;
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestDataBlocks.class);
-
-  /**
-   * Test to verify different DataBlocks factories, different operations.
-   */
-  @Test
-  public void testDataBlocksFactory() throws Exception {
-    testCreateFactory(DATA_BLOCKS_BUFFER_DISK);
-    testCreateFactory(DATA_BLOCKS_BUFFER_ARRAY);
-    testCreateFactory(DATA_BLOCKS_BYTEBUFFER);
-  }
-
-  /**
-   * Verify creation of a data block factory and it's operations.
-   *
-   * @param nameOfFactory Name of the DataBlock factory to be created.
-   * @throws IOException Throw IOE in case of failure while creating a block.
-   */
-  public void testCreateFactory(String nameOfFactory) throws Exception {
-    LOG.info("Testing: {}", nameOfFactory);
-    DataBlocks.BlockFactory diskFactory =
-        DataBlocks.createFactory("Dir", configuration, nameOfFactory);
-
-    DataBlocks.DataBlock dataBlock = diskFactory.create(0, ONE_KB, null);
-    assertWriteBlock(dataBlock);
-    assertToByteArray(dataBlock);
-    assertCloseBlock(dataBlock);
-  }
-
-  /**
-   * Verify Writing of a dataBlock.
-   *
-   * @param dataBlock DataBlock to be tested.
-   * @throws IOException Throw Exception in case of failures.
-   */
-  private void assertWriteBlock(DataBlocks.DataBlock dataBlock)
-      throws IOException {
-    byte[] oneKbBuff = new byte[ONE_KB];
-    new Random().nextBytes(oneKbBuff);
-    dataBlock.write(oneKbBuff, 0, ONE_KB);
-    // Verify DataBlock state is at Writing.
-    dataBlock.verifyState(DataBlocks.DataBlock.DestState.Writing);
-    // Verify that the DataBlock has data written.
-    assertTrue("Expected Data block to have data", dataBlock.hasData());
-    // Verify the size of data.
-    assertEquals("Mismatch in data size in block", dataBlock.dataSize(),
-        ONE_KB);
-    // Verify that no capacity is left in the data block to write more.
-    assertFalse("Expected the data block to have no capacity to write 1 byte "
-        + "of data", dataBlock.hasCapacity(1));
-  }
-
-  /**
-   * Verify the Conversion of Data blocks into byte[].
-   *
-   * @param dataBlock data block to be tested.
-   * @throws Exception Throw Exception in case of failures.
-   */
-  private void assertToByteArray(DataBlocks.DataBlock dataBlock)
-      throws Exception {
-    DataBlocks.BlockUploadData blockUploadData = dataBlock.startUpload();
-    // Verify that the current state is in upload.
-    dataBlock.verifyState(DataBlocks.DataBlock.DestState.Upload);
-    // Convert the DataBlock upload to byteArray.
-    byte[] bytesWritten = blockUploadData.toByteArray();
-    // Verify that we can call toByteArray() more than once and gives the
-    // same byte[].
-    assertEquals("Mismatch in byteArray provided by toByteArray() the second "
-        + "time", bytesWritten, blockUploadData.toByteArray());
-    IOUtils.close(blockUploadData);
-    // Verify that after closing blockUploadData, we can't call toByteArray().
-    LambdaTestUtils.intercept(IllegalStateException.class,
-        "Block is closed",
-        "Expected to throw IllegalStateException.java after closing "
-            + "blockUploadData and trying to call toByteArray()",
-        () -> {
-          blockUploadData.toByteArray();
-        });
-  }
-
-  /**
-   * Verify the close() of data blocks.
-   *
-   * @param dataBlock data block to be tested.
-   * @throws IOException Throw Exception in case of failures.
-   */
-  private void assertCloseBlock(DataBlocks.DataBlock dataBlock)
-      throws IOException {
-    dataBlock.close();
-    // Verify that the current state is in Closed.
-    dataBlock.verifyState(DataBlocks.DataBlock.DestState.Closed);
-  }
-}

+ 2 - 40
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

@@ -90,7 +90,6 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
-import org.apache.hadoop.fs.store.DataBlocks;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -102,11 +101,6 @@ import org.apache.hadoop.util.Progressable;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
 import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
 import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
 
@@ -131,13 +125,6 @@ public class AzureBlobFileSystem extends FileSystem
   private TracingHeaderFormat tracingHeaderFormat;
   private Listener listener;
 
-  /** Name of blockFactory to be used by AbfsOutputStream. */
-  private String blockOutputBuffer;
-  /** BlockFactory instance to be used. */
-  private DataBlocks.BlockFactory blockFactory;
-  /** Maximum Active blocks per OutputStream. */
-  private int blockOutputActiveBlocks;
-
   @Override
   public void initialize(URI uri, Configuration configuration)
       throws IOException {
@@ -149,33 +136,8 @@ public class AzureBlobFileSystem extends FileSystem
 
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
     abfsCounters = new AbfsCountersImpl(uri);
-    // name of the blockFactory to be used.
-    this.blockOutputBuffer = configuration.getTrimmed(DATA_BLOCKS_BUFFER,
-        DATA_BLOCKS_BUFFER_DEFAULT);
-    // blockFactory used for this FS instance.
-    this.blockFactory =
-        DataBlocks.createFactory(FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR,
-            configuration, blockOutputBuffer);
-    this.blockOutputActiveBlocks =
-        configuration.getInt(FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS,
-            BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT);
-    if (blockOutputActiveBlocks < 1) {
-      blockOutputActiveBlocks = 1;
-    }
-
-    // AzureBlobFileSystemStore with params in builder.
-    AzureBlobFileSystemStore.AzureBlobFileSystemStoreBuilder
-        systemStoreBuilder =
-        new AzureBlobFileSystemStore.AzureBlobFileSystemStoreBuilder()
-            .withUri(uri)
-            .withSecureScheme(this.isSecureScheme())
-            .withConfiguration(configuration)
-            .withAbfsCounters(abfsCounters)
-            .withBlockFactory(blockFactory)
-            .withBlockOutputActiveBlocks(blockOutputActiveBlocks)
-            .build();
-
-    this.abfsStore = new AzureBlobFileSystemStore(systemStoreBuilder);
+    this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(),
+        configuration, abfsCounters);
     LOG.trace("AzureBlobFileSystemStore init complete");
 
     final AbfsConfiguration abfsConfiguration = abfsStore

+ 25 - 140
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -51,8 +51,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.WeakHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
@@ -122,12 +120,8 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.store.DataBlocks;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
-import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
-import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.http.client.utils.URIBuilder;
 
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
@@ -178,23 +172,10 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
    */
   private Set<String> appendBlobDirSet;
 
-  /** BlockFactory being used by this instance.*/
-  private DataBlocks.BlockFactory blockFactory;
-  /** Number of active data blocks per AbfsOutputStream */
-  private int blockOutputActiveBlocks;
-  /** Bounded ThreadPool for this instance. */
-  private ExecutorService boundedThreadPool;
-
-  /**
-   * FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations.
-   * Built using the {@link AzureBlobFileSystemStoreBuilder} with parameters
-   * required.
-   * @param abfsStoreBuilder Builder for AzureBlobFileSystemStore.
-   * @throws IOException Throw IOE in case of failure during constructing.
-   */
-  public AzureBlobFileSystemStore(
-      AzureBlobFileSystemStoreBuilder abfsStoreBuilder) throws IOException {
-    this.uri = abfsStoreBuilder.uri;
+  public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
+                                  Configuration configuration,
+                                  AbfsCounters abfsCounters) throws IOException {
+    this.uri = uri;
     String[] authorityParts = authorityParts(uri);
     final String fileSystemName = authorityParts[0];
     final String accountName = authorityParts[1];
@@ -202,7 +183,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());
 
     try {
-      this.abfsConfiguration = new AbfsConfiguration(abfsStoreBuilder.configuration, accountName);
+      this.abfsConfiguration = new AbfsConfiguration(configuration, accountName);
     } catch (IllegalAccessException exception) {
       throw new FileSystemOperationUnhandledException(exception);
     }
@@ -232,16 +213,16 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     updateInfiniteLeaseDirs();
     this.authType = abfsConfiguration.getAuthType(accountName);
     boolean usingOauth = (authType == AuthType.OAuth);
-    boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : abfsStoreBuilder.isSecureScheme;
+    boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
     this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
-    this.abfsCounters = abfsStoreBuilder.abfsCounters;
+    this.abfsCounters = abfsCounters;
     initializeClient(uri, fileSystemName, accountName, useHttps);
     final Class<? extends IdentityTransformerInterface> identityTransformerClass =
-        abfsStoreBuilder.configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
+        configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
             IdentityTransformerInterface.class);
     try {
       this.identityTransformer =
-          identityTransformerClass.getConstructor(Configuration.class).newInstance(abfsStoreBuilder.configuration);
+          identityTransformerClass.getConstructor(Configuration.class).newInstance(configuration);
     } catch (IllegalAccessException | InstantiationException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException e) {
       throw new IOException(e);
     }
@@ -255,13 +236,6 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
       this.appendBlobDirSet = new HashSet<>(Arrays.asList(
           abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA)));
     }
-    this.blockFactory = abfsStoreBuilder.blockFactory;
-    this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
-    this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
-        abfsConfiguration.getWriteMaxConcurrentRequestCount(),
-        abfsConfiguration.getMaxWriteRequestsToQueue(),
-        10L, TimeUnit.SECONDS,
-        "abfs-bounded");
   }
 
   /**
@@ -298,10 +272,6 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     }
     try {
       Futures.allAsList(futures).get();
-      // shutdown the threadPool and set it to null.
-      HadoopExecutors.shutdown(boundedThreadPool, LOG,
-          30, TimeUnit.SECONDS);
-      boundedThreadPool = null;
     } catch (InterruptedException e) {
       LOG.error("Interrupted freeing leases", e);
       Thread.currentThread().interrupt();
@@ -528,7 +498,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
   public OutputStream createFile(final Path path,
       final FileSystem.Statistics statistics, final boolean overwrite,
       final FsPermission permission, final FsPermission umask,
-      TracingContext tracingContext) throws IOException {
+      TracingContext tracingContext) throws AzureBlobFileSystemException {
     try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
       boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
       LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
@@ -579,14 +549,12 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
       AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
 
       return new AbfsOutputStream(
-          populateAbfsOutputStreamContext(
-              isAppendBlob,
-              lease,
-              client,
-              statistics,
-              relativePath,
-              0,
-              tracingContext));
+          client,
+          statistics,
+          relativePath,
+          0,
+          populateAbfsOutputStreamContext(isAppendBlob, lease),
+          tracingContext);
     }
   }
 
@@ -660,29 +628,8 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     return op;
   }
 
-  /**
-   * Method to populate AbfsOutputStreamContext with different parameters to
-   * be used to construct {@link AbfsOutputStream}.
-   *
-   * @param isAppendBlob   is Append blob support enabled?
-   * @param lease          instance of AbfsLease for this AbfsOutputStream.
-   * @param client         AbfsClient.
-   * @param statistics     FileSystem statistics.
-   * @param path           Path for AbfsOutputStream.
-   * @param position       Position or offset of the file being opened, set to 0
-   *                       when creating a new file, but needs to be set for APPEND
-   *                       calls on the same file.
-   * @param tracingContext instance of TracingContext for this AbfsOutputStream.
-   * @return AbfsOutputStreamContext instance with the desired parameters.
-   */
-  private AbfsOutputStreamContext populateAbfsOutputStreamContext(
-      boolean isAppendBlob,
-      AbfsLease lease,
-      AbfsClient client,
-      FileSystem.Statistics statistics,
-      String path,
-      long position,
-      TracingContext tracingContext) {
+  private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob,
+      AbfsLease lease) {
     int bufferSize = abfsConfiguration.getWriteBufferSize();
     if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
       bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
@@ -697,15 +644,6 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
             .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
             .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
             .withLease(lease)
-            .withBlockFactory(blockFactory)
-            .withBlockOutputActiveBlocks(blockOutputActiveBlocks)
-            .withClient(client)
-            .withPosition(position)
-            .withFsStatistics(statistics)
-            .withPath(path)
-            .withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
-                blockOutputActiveBlocks, true))
-            .withTracingContext(tracingContext)
             .build();
   }
 
@@ -817,7 +755,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
 
   public OutputStream openFileForWrite(final Path path,
       final FileSystem.Statistics statistics, final boolean overwrite,
-      TracingContext tracingContext) throws IOException {
+      TracingContext tracingContext) throws AzureBlobFileSystemException {
     try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
       LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}",
               client.getFileSystem(),
@@ -853,14 +791,12 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
       AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
 
       return new AbfsOutputStream(
-          populateAbfsOutputStreamContext(
-              isAppendBlob,
-              lease,
-              client,
-              statistics,
-              relativePath,
-              offset,
-              tracingContext));
+          client,
+          statistics,
+          relativePath,
+          offset,
+          populateAbfsOutputStreamContext(isAppendBlob, lease),
+          tracingContext);
     }
   }
 
@@ -1808,57 +1744,6 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     }
   }
 
-  /**
-   * A builder class for AzureBlobFileSystemStore.
-   */
-  public static final class AzureBlobFileSystemStoreBuilder {
-
-    private URI uri;
-    private boolean isSecureScheme;
-    private Configuration configuration;
-    private AbfsCounters abfsCounters;
-    private DataBlocks.BlockFactory blockFactory;
-    private int blockOutputActiveBlocks;
-
-    public AzureBlobFileSystemStoreBuilder withUri(URI value) {
-      this.uri = value;
-      return this;
-    }
-
-    public AzureBlobFileSystemStoreBuilder withSecureScheme(boolean value) {
-      this.isSecureScheme = value;
-      return this;
-    }
-
-    public AzureBlobFileSystemStoreBuilder withConfiguration(
-        Configuration value) {
-      this.configuration = value;
-      return this;
-    }
-
-    public AzureBlobFileSystemStoreBuilder withAbfsCounters(
-        AbfsCounters value) {
-      this.abfsCounters = value;
-      return this;
-    }
-
-    public AzureBlobFileSystemStoreBuilder withBlockFactory(
-        DataBlocks.BlockFactory value) {
-      this.blockFactory = value;
-      return this;
-    }
-
-    public AzureBlobFileSystemStoreBuilder withBlockOutputActiveBlocks(
-        int value) {
-      this.blockOutputActiveBlocks = value;
-      return this;
-    }
-
-    public AzureBlobFileSystemStoreBuilder build() {
-      return this;
-    }
-  }
-
   @VisibleForTesting
   AbfsClient getClient() {
     return this.client;

+ 0 - 31
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

@@ -56,37 +56,6 @@ public final class ConfigurationKeys {
   public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests";
   public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
   public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
-
-  /**
-   * Maximum Number of blocks a single output stream can have
-   * active (uploading, or queued to the central FileSystem
-   * instance's pool of queued operations.
-   * This stops a single stream overloading the shared thread pool.
-   * {@value}
-   * <p>
-   * Default is {@link FileSystemConfigurations#BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT}
-   */
-  public static final String FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS =
-      "fs.azure.block.upload.active.blocks";
-
-  /**
-   * Buffer directory path for uploading AbfsOutputStream data blocks.
-   * Value: {@value}
-   */
-  public static final String FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR =
-      "fs.azure.buffer.dir";
-
-  /**
-   * What data block buffer to use.
-   * <br>
-   * Options include: "disk"(Default), "array", and "bytebuffer".
-   * <br>
-   * Default is {@link FileSystemConfigurations#DATA_BLOCKS_BUFFER_DEFAULT}.
-   * Value: {@value}
-   */
-  public static final String DATA_BLOCKS_BUFFER =
-      "fs.azure.data.blocks.buffer";
-
   /** If the data size written by Hadoop app is small, i.e. data size :
    *  (a) before any of HFlush/HSync call is made or
    *  (b) between 2 HFlush/Hsync API calls

+ 0 - 18
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

@@ -115,23 +115,5 @@ public final class FileSystemConfigurations {
   public static final int STREAM_ID_LEN = 12;
   public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
 
-  /**
-   * Limit of queued block upload operations before writes
-   * block for an OutputStream. Value: {@value}
-   */
-  public static final int BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT = 20;
-
-  /**
-   * Buffer blocks to disk.
-   * Capacity is limited to available disk space.
-   */
-  public static final String DATA_BLOCKS_BUFFER_DISK = "disk";
-
-  /**
-   * Default buffer option: {@value}.
-   */
-  public static final String DATA_BLOCKS_BUFFER_DEFAULT =
-      DATA_BLOCKS_BUFFER_DISK;
-
   private FileSystemConfigurations() {}
 }

+ 204 - 265
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

@@ -20,20 +20,24 @@ package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
+import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.UUID;
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
-import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
@@ -43,9 +47,10 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
 import org.apache.hadoop.fs.azurebfs.utils.Listener;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
-import org.apache.hadoop.fs.store.DataBlocks;
+import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.StreamCapabilities;
@@ -58,7 +63,6 @@ import static org.apache.hadoop.io.IOUtils.wrapException;
 import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
 import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE;
 import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE;
-import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState;
 
 /**
  * The BlobFsOutputStream for Rest AbfsClient.
@@ -68,12 +72,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
 
   private final AbfsClient client;
   private final String path;
-  /** The position in the file being uploaded, where the next block would be
-   * uploaded.
-   * This is used in constructing the AbfsClient requests to ensure that,
-   * even if blocks are uploaded out of order, they are reassembled in
-   * correct order.
-   * */
   private long position;
   private boolean closed;
   private boolean supportFlush;
@@ -93,6 +91,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
   private final int maxRequestsThatCanBeQueued;
 
   private ConcurrentLinkedDeque<WriteOperation> writeOperations;
+  private final ThreadPoolExecutor threadExecutor;
+  private final ExecutorCompletionService<Void> completionService;
 
   // SAS tokens can be re-used until they expire
   private CachedSASToken cachedSasToken;
@@ -103,6 +103,15 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
   private AbfsLease lease;
   private String leaseId;
 
+  /**
+   * Queue storing buffers with the size of the Azure block ready for
+   * reuse. The pool allows reusing the blocks instead of allocating new
+   * blocks. After the data is sent to the service, the buffer is returned
+   * back to the queue
+   */
+  private ElasticByteBufferPool byteBufferPool
+          = new ElasticByteBufferPool();
+
   private final Statistics statistics;
   private final AbfsOutputStreamStatistics outputStreamStatistics;
   private IOStatistics ioStatistics;
@@ -110,27 +119,17 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
   private static final Logger LOG =
       LoggerFactory.getLogger(AbfsOutputStream.class);
 
-  /** Factory for blocks. */
-  private final DataBlocks.BlockFactory blockFactory;
-
-  /** Current data block. Null means none currently active. */
-  private DataBlocks.DataBlock activeBlock;
-
-  /** Count of blocks uploaded. */
-  private long blockCount = 0;
-
-  /** The size of a single block. */
-  private final int blockSize;
-
-  /** Executor service to carry out the parallel upload requests. */
-  private final ListeningExecutorService executorService;
-
-  public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
-      throws IOException {
-    this.client = abfsOutputStreamContext.getClient();
-    this.statistics = abfsOutputStreamContext.getStatistics();
-    this.path = abfsOutputStreamContext.getPath();
-    this.position = abfsOutputStreamContext.getPosition();
+  public AbfsOutputStream(
+          final AbfsClient client,
+          final Statistics statistics,
+          final String path,
+          final long position,
+          AbfsOutputStreamContext abfsOutputStreamContext,
+          TracingContext tracingContext) {
+    this.client = client;
+    this.statistics = statistics;
+    this.path = path;
+    this.position = position;
     this.closed = false;
     this.supportFlush = abfsOutputStreamContext.isEnableFlush();
     this.disableOutputStreamFlush = abfsOutputStreamContext
@@ -141,6 +140,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
     this.lastError = null;
     this.lastFlushOffset = 0;
     this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
+    this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
     this.bufferIndex = 0;
     this.numOfAppendsToServerSinceLastFlush = 0;
     this.writeOperations = new ConcurrentLinkedDeque<>();
@@ -157,20 +157,23 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
 
     this.lease = abfsOutputStreamContext.getLease();
     this.leaseId = abfsOutputStreamContext.getLeaseId();
-    this.executorService =
-        MoreExecutors.listeningDecorator(abfsOutputStreamContext.getExecutorService());
+
+    this.threadExecutor
+        = new ThreadPoolExecutor(maxConcurrentRequestCount,
+        maxConcurrentRequestCount,
+        10L,
+        TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>());
+    this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
     this.cachedSasToken = new CachedSASToken(
         abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
+    if (outputStreamStatistics != null) {
+      this.ioStatistics = outputStreamStatistics.getIOStatistics();
+    }
     this.outputStreamId = createOutputStreamId();
-    this.tracingContext = new TracingContext(abfsOutputStreamContext.getTracingContext());
+    this.tracingContext = new TracingContext(tracingContext);
     this.tracingContext.setStreamID(outputStreamId);
     this.tracingContext.setOperation(FSOperationType.WRITE);
-    this.ioStatistics = outputStreamStatistics.getIOStatistics();
-    this.blockFactory = abfsOutputStreamContext.getBlockFactory();
-    this.blockSize = bufferSize;
-    // create that first block. This guarantees that an open + close sequence
-    // writes a 0-byte entry.
-    createBlockIfNeeded();
   }
 
   private String createOutputStreamId() {
@@ -216,10 +219,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
   @Override
   public synchronized void write(final byte[] data, final int off, final int length)
       throws IOException {
-    // validate if data is not null and index out of bounds.
-    DataBlocks.validateWriteArgs(data, off, length);
     maybeThrowLastError();
 
+    Preconditions.checkArgument(data != null, "null data");
+
     if (off < 0 || length < 0 || length > data.length - off) {
       throw new IndexOutOfBoundsException();
     }
@@ -227,182 +230,27 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
     if (hasLease() && isLeaseFreed()) {
       throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
     }
-    DataBlocks.DataBlock block = createBlockIfNeeded();
-    int written = block.write(data, off, length);
-    int remainingCapacity = block.remainingCapacity();
-
-    if (written < length) {
-      // Number of bytes to write is more than the data block capacity,
-      // trigger an upload and then write on the next block.
-      LOG.debug("writing more data than block capacity -triggering upload");
-      uploadCurrentBlock();
-      // tail recursion is mildly expensive, but given buffer sizes must be MB.
-      // it's unlikely to recurse very deeply.
-      this.write(data, off + written, length - written);
-    } else {
-      if (remainingCapacity == 0) {
-        // the whole buffer is done, trigger an upload
-        uploadCurrentBlock();
-      }
-    }
-    incrementWriteOps();
-  }
-
-  /**
-   * Demand create a destination block.
-   *
-   * @return the active block; null if there isn't one.
-   * @throws IOException on any failure to create
-   */
-  private synchronized DataBlocks.DataBlock createBlockIfNeeded()
-      throws IOException {
-    if (activeBlock == null) {
-      blockCount++;
-      activeBlock = blockFactory
-          .create(blockCount, this.blockSize, outputStreamStatistics);
-    }
-    return activeBlock;
-  }
-
-  /**
-   * Start an asynchronous upload of the current block.
-   *
-   * @throws IOException Problems opening the destination for upload,
-   *                     initializing the upload, or if a previous operation has failed.
-   */
-  private synchronized void uploadCurrentBlock() throws IOException {
-    checkState(hasActiveBlock(), "No active block");
-    LOG.debug("Writing block # {}", blockCount);
-    try {
-      uploadBlockAsync(getActiveBlock(), false, false);
-    } finally {
-      // set the block to null, so the next write will create a new block.
-      clearActiveBlock();
-    }
-  }
-
-  /**
-   * Upload a block of data.
-   * This will take the block.
-   *
-   * @param blockToUpload    block to upload.
-   * @throws IOException     upload failure
-   */
-  private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload,
-      boolean isFlush, boolean isClose)
-      throws IOException {
-    if (this.isAppendBlob) {
-      writeAppendBlobCurrentBufferToService();
-      return;
-    }
-    if (!blockToUpload.hasData()) {
-      return;
-    }
-    numOfAppendsToServerSinceLastFlush++;
-
-    final int bytesLength = blockToUpload.dataSize();
-    final long offset = position;
-    position += bytesLength;
-    outputStreamStatistics.bytesToUpload(bytesLength);
-    outputStreamStatistics.writeCurrentBuffer();
-    DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload();
-    final Future<Void> job =
-        executorService.submit(() -> {
-          AbfsPerfTracker tracker =
-              client.getAbfsPerfTracker();
-          try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
-              "writeCurrentBufferToService", "append")) {
-            AppendRequestParameters.Mode
-                mode = APPEND_MODE;
-            if (isFlush & isClose) {
-              mode = FLUSH_CLOSE_MODE;
-            } else if (isFlush) {
-              mode = FLUSH_MODE;
-            }
-            /*
-             * Parameters Required for an APPEND call.
-             * offset(here) - refers to the position in the file.
-             * bytesLength - Data to be uploaded from the block.
-             * mode - If it's append, flush or flush_close.
-             * leaseId - The AbfsLeaseId for this request.
-             */
-            AppendRequestParameters reqParams = new AppendRequestParameters(
-                offset, 0, bytesLength, mode, false, leaseId);
-            AbfsRestOperation op =
-                client.append(path, blockUploadData.toByteArray(), reqParams,
-                    cachedSasToken.get(), new TracingContext(tracingContext));
-            cachedSasToken.update(op.getSasToken());
-            perfInfo.registerResult(op.getResult());
-            perfInfo.registerSuccess(true);
-            outputStreamStatistics.uploadSuccessful(bytesLength);
-            return null;
-          } finally {
-            IOUtils.close(blockUploadData);
-          }
-        });
-    writeOperations.add(new WriteOperation(job, offset, bytesLength));
 
-    // Try to shrink the queue
-    shrinkWriteOperationQueue();
-  }
-
-  /**
-   * A method to set the lastError if an exception is caught.
-   * @param ex Exception caught.
-   * @throws IOException Throws the lastError.
-   */
-  private void failureWhileSubmit(Exception ex) throws IOException {
-    if (ex instanceof AbfsRestOperationException) {
-      if (((AbfsRestOperationException) ex).getStatusCode()
-          == HttpURLConnection.HTTP_NOT_FOUND) {
-        throw new FileNotFoundException(ex.getMessage());
+    int currentOffset = off;
+    int writableBytes = bufferSize - bufferIndex;
+    int numberOfBytesToWrite = length;
+
+    while (numberOfBytesToWrite > 0) {
+      if (writableBytes <= numberOfBytesToWrite) {
+        System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
+        bufferIndex += writableBytes;
+        writeCurrentBufferToService();
+        currentOffset += writableBytes;
+        numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
+      } else {
+        System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
+        bufferIndex += numberOfBytesToWrite;
+        numberOfBytesToWrite = 0;
       }
-    }
-    if (ex instanceof IOException) {
-      lastError = (IOException) ex;
-    } else {
-      lastError = new IOException(ex);
-    }
-    throw lastError;
-  }
-
-  /**
-   * Synchronized accessor to the active block.
-   *
-   * @return the active block; null if there isn't one.
-   */
-  private synchronized DataBlocks.DataBlock getActiveBlock() {
-    return activeBlock;
-  }
-
-  /**
-   * Predicate to query whether or not there is an active block.
-   *
-   * @return true if there is an active block.
-   */
-  private synchronized boolean hasActiveBlock() {
-    return activeBlock != null;
-  }
 
-  /**
-   * Is there an active block and is there any data in it to upload?
-   *
-   * @return true if there is some data to upload in an active block else false.
-   */
-  private boolean hasActiveBlockDataToUpload() {
-    return hasActiveBlock() && getActiveBlock().hasData();
-  }
-
-  /**
-   * Clear the active block.
-   */
-  private void clearActiveBlock() {
-    if (activeBlock != null) {
-      LOG.debug("Clearing active block");
-    }
-    synchronized (this) {
-      activeBlock = null;
+      writableBytes = bufferSize - bufferIndex;
     }
+    incrementWriteOps();
   }
 
   /**
@@ -487,6 +335,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
 
     try {
       flushInternal(true);
+      threadExecutor.shutdown();
     } catch (IOException e) {
       // Problems surface in try-with-resources clauses if
       // the exception thrown in a close == the one already thrown
@@ -503,8 +352,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
       bufferIndex = 0;
       closed = true;
       writeOperations.clear();
-      if (hasActiveBlock()) {
-        clearActiveBlock();
+      byteBufferPool = null;
+      if (!threadExecutor.isShutdown()) {
+        threadExecutor.shutdownNow();
       }
     }
     LOG.debug("Closing AbfsOutputStream : {}", this);
@@ -518,22 +368,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
         && enableSmallWriteOptimization
         && (numOfAppendsToServerSinceLastFlush == 0) // there are no ongoing store writes
         && (writeOperations.size() == 0) // double checking no appends in progress
-        && hasActiveBlockDataToUpload()) { // there is
-      // some data that is pending to be written
+        && (bufferIndex > 0)) { // there is some data that is pending to be written
       smallWriteOptimizedflushInternal(isClose);
       return;
     }
 
-    if (hasActiveBlockDataToUpload()) {
-      uploadCurrentBlock();
-    }
+    writeCurrentBufferToService();
     flushWrittenBytesToService(isClose);
     numOfAppendsToServerSinceLastFlush = 0;
   }
 
   private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException {
     // writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush
-    uploadBlockAsync(getActiveBlock(), true, isClose);
+    writeCurrentBufferToService(true, isClose);
     waitForAppendsToComplete();
     shrinkWriteOperationQueue();
     maybeThrowLastError();
@@ -542,60 +389,131 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
 
   private synchronized void flushInternalAsync() throws IOException {
     maybeThrowLastError();
-    if (hasActiveBlockDataToUpload()) {
-      uploadCurrentBlock();
-    }
-    waitForAppendsToComplete();
+    writeCurrentBufferToService();
     flushWrittenBytesToServiceAsync();
   }
 
-  /**
-   * Appending the current active data block to service. Clearing the active
-   * data block and releasing all buffered data.
-   * @throws IOException if there is any failure while starting an upload for
-   *                     the dataBlock or while closing the BlockUploadData.
-   */
   private void writeAppendBlobCurrentBufferToService() throws IOException {
-    DataBlocks.DataBlock activeBlock = getActiveBlock();
-    // No data, return.
-    if (!hasActiveBlockDataToUpload()) {
+    if (bufferIndex == 0) {
       return;
     }
-
-    final int bytesLength = activeBlock.dataSize();
-    DataBlocks.BlockUploadData uploadData = activeBlock.startUpload();
-    clearActiveBlock();
-    outputStreamStatistics.writeCurrentBuffer();
-    outputStreamStatistics.bytesToUpload(bytesLength);
+    final byte[] bytes = buffer;
+    final int bytesLength = bufferIndex;
+    if (outputStreamStatistics != null) {
+      outputStreamStatistics.writeCurrentBuffer();
+      outputStreamStatistics.bytesToUpload(bytesLength);
+    }
+    buffer = byteBufferPool.getBuffer(false, bufferSize).array();
+    bufferIndex = 0;
     final long offset = position;
     position += bytesLength;
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
-        "writeCurrentBufferToService", "append")) {
+            "writeCurrentBufferToService", "append")) {
       AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
           bytesLength, APPEND_MODE, true, leaseId);
-      AbfsRestOperation op = client.append(path, uploadData.toByteArray(), reqParams,
-          cachedSasToken.get(), new TracingContext(tracingContext));
+      AbfsRestOperation op = client
+          .append(path, bytes, reqParams, cachedSasToken.get(),
+              new TracingContext(tracingContext));
       cachedSasToken.update(op.getSasToken());
-      outputStreamStatistics.uploadSuccessful(bytesLength);
-
+      if (outputStreamStatistics != null) {
+        outputStreamStatistics.uploadSuccessful(bytesLength);
+      }
       perfInfo.registerResult(op.getResult());
+      byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
       perfInfo.registerSuccess(true);
       return;
     } catch (Exception ex) {
-      outputStreamStatistics.uploadFailed(bytesLength);
-      failureWhileSubmit(ex);
-    } finally {
-      IOUtils.close(uploadData);
+      if (ex instanceof AbfsRestOperationException) {
+        if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
+          throw new FileNotFoundException(ex.getMessage());
+        }
+      }
+      if (ex instanceof AzureBlobFileSystemException) {
+        ex = (AzureBlobFileSystemException) ex;
+      }
+      lastError = new IOException(ex);
+      throw lastError;
     }
   }
 
+  private synchronized void writeCurrentBufferToService() throws IOException {
+    writeCurrentBufferToService(false, false);
+  }
+
+  private synchronized void writeCurrentBufferToService(boolean isFlush, boolean isClose) throws IOException {
+    if (this.isAppendBlob) {
+      writeAppendBlobCurrentBufferToService();
+      return;
+    }
+
+    if (bufferIndex == 0) {
+      return;
+    }
+    numOfAppendsToServerSinceLastFlush++;
+
+    final byte[] bytes = buffer;
+    final int bytesLength = bufferIndex;
+    if (outputStreamStatistics != null) {
+      outputStreamStatistics.writeCurrentBuffer();
+      outputStreamStatistics.bytesToUpload(bytesLength);
+    }
+    buffer = byteBufferPool.getBuffer(false, bufferSize).array();
+    bufferIndex = 0;
+    final long offset = position;
+    position += bytesLength;
+
+    if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
+      //Tracking time spent on waiting for task to complete.
+      if (outputStreamStatistics != null) {
+        try (DurationTracker ignored = outputStreamStatistics.timeSpentTaskWait()) {
+          waitForTaskToComplete();
+        }
+      } else {
+        waitForTaskToComplete();
+      }
+    }
+    final Future<Void> job = completionService.submit(() -> {
+      AbfsPerfTracker tracker = client.getAbfsPerfTracker();
+          try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
+              "writeCurrentBufferToService", "append")) {
+            AppendRequestParameters.Mode
+                mode = APPEND_MODE;
+            if (isFlush & isClose) {
+              mode = FLUSH_CLOSE_MODE;
+            } else if (isFlush) {
+              mode = FLUSH_MODE;
+            }
+            AppendRequestParameters reqParams = new AppendRequestParameters(
+                offset, 0, bytesLength, mode, false, leaseId);
+            AbfsRestOperation op = client.append(path, bytes, reqParams,
+                cachedSasToken.get(), new TracingContext(tracingContext));
+            cachedSasToken.update(op.getSasToken());
+            perfInfo.registerResult(op.getResult());
+            byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
+            perfInfo.registerSuccess(true);
+            return null;
+          }
+        });
+
+    if (outputStreamStatistics != null) {
+      if (job.isCancelled()) {
+        outputStreamStatistics.uploadFailed(bytesLength);
+      } else {
+        outputStreamStatistics.uploadSuccessful(bytesLength);
+      }
+    }
+    writeOperations.add(new WriteOperation(job, offset, bytesLength));
+
+    // Try to shrink the queue
+    shrinkWriteOperationQueue();
+  }
+
   private synchronized void waitForAppendsToComplete() throws IOException {
     for (WriteOperation writeOperation : writeOperations) {
       try {
         writeOperation.task.get();
       } catch (Exception ex) {
-        outputStreamStatistics.uploadFailed(writeOperation.length);
         if (ex.getCause() instanceof AbfsRestOperationException) {
           if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
             throw new FileNotFoundException(ex.getMessage());
@@ -645,8 +563,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
           throw new FileNotFoundException(ex.getMessage());
         }
       }
-      lastError = new IOException(ex);
-      throw lastError;
+      throw new IOException(ex);
     }
     this.lastFlushOffset = offset;
   }
@@ -657,14 +574,14 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
    */
   private synchronized void shrinkWriteOperationQueue() throws IOException {
     try {
-      WriteOperation peek = writeOperations.peek();
-      while (peek != null && peek.task.isDone()) {
-        peek.task.get();
-        lastTotalAppendOffset += peek.length;
+      while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
+        writeOperations.peek().task.get();
+        lastTotalAppendOffset += writeOperations.peek().length;
         writeOperations.remove();
-        peek = writeOperations.peek();
         // Incrementing statistics to indicate queue has been shrunk.
-        outputStreamStatistics.queueShrunk();
+        if (outputStreamStatistics != null) {
+          outputStreamStatistics.queueShrunk();
+        }
       }
     } catch (Exception e) {
       if (e.getCause() instanceof AzureBlobFileSystemException) {
@@ -676,6 +593,26 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
     }
   }
 
+  private void waitForTaskToComplete() throws IOException {
+    boolean completed;
+    for (completed = false; completionService.poll() != null; completed = true) {
+      // keep polling until there is no data
+    }
+    // for AppendBLob, jobs are not submitted to completion service
+    if (isAppendBlob) {
+      completed = true;
+    }
+
+    if (!completed) {
+      try {
+        completionService.take();
+      } catch (InterruptedException e) {
+        lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
+        throw lastError;
+      }
+    }
+  }
+
   private static class WriteOperation {
     private final Future<Void> task;
     private final long startOffset;
@@ -694,7 +631,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
 
   @VisibleForTesting
   public synchronized void waitForPendingUploads() throws IOException {
-    waitForAppendsToComplete();
+    waitForTaskToComplete();
   }
 
   /**
@@ -758,10 +695,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(super.toString());
-    sb.append("AbfsOutputStream@").append(this.hashCode());
-    sb.append("){");
-    sb.append(outputStreamStatistics.toString());
-    sb.append("}");
+    if (outputStreamStatistics != null) {
+      sb.append("AbfsOutputStream@").append(this.hashCode());
+      sb.append("){");
+      sb.append(outputStreamStatistics.toString());
+      sb.append("}");
+    }
     return sb.toString();
   }
 }

+ 0 - 107
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java

@@ -18,12 +18,6 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
-import java.util.concurrent.ExecutorService;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
-import org.apache.hadoop.fs.store.DataBlocks;
-
 /**
  * Class to hold extra output stream configs.
  */
@@ -47,22 +41,6 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
 
   private AbfsLease lease;
 
-  private DataBlocks.BlockFactory blockFactory;
-
-  private int blockOutputActiveBlocks;
-
-  private AbfsClient client;
-
-  private long position;
-
-  private FileSystem.Statistics statistics;
-
-  private String path;
-
-  private ExecutorService executorService;
-
-  private TracingContext tracingContext;
-
   public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
     super(sasTokenRenewPeriodForStreamsInSeconds);
   }
@@ -101,64 +79,11 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
     return this;
   }
 
-  public AbfsOutputStreamContext withBlockFactory(
-      final DataBlocks.BlockFactory blockFactory) {
-    this.blockFactory = blockFactory;
-    return this;
-  }
-
-  public AbfsOutputStreamContext withBlockOutputActiveBlocks(
-      final int blockOutputActiveBlocks) {
-    this.blockOutputActiveBlocks = blockOutputActiveBlocks;
-    return this;
-  }
-
-
-  public AbfsOutputStreamContext withClient(
-      final AbfsClient client) {
-    this.client = client;
-    return this;
-  }
-
-  public AbfsOutputStreamContext withPosition(
-      final long position) {
-    this.position = position;
-    return this;
-  }
-
-  public AbfsOutputStreamContext withFsStatistics(
-      final FileSystem.Statistics statistics) {
-    this.statistics = statistics;
-    return this;
-  }
-
-  public AbfsOutputStreamContext withPath(
-      final String path) {
-    this.path = path;
-    return this;
-  }
-
-  public AbfsOutputStreamContext withExecutorService(
-      final ExecutorService executorService) {
-    this.executorService = executorService;
-    return this;
-  }
-
-  public AbfsOutputStreamContext withTracingContext(
-      final TracingContext tracingContext) {
-    this.tracingContext = tracingContext;
-    return this;
-  }
-
   public AbfsOutputStreamContext build() {
     // Validation of parameters to be done here.
-    if (streamStatistics == null) {
-      streamStatistics = new AbfsOutputStreamStatisticsImpl();
-    }
     return this;
   }
 
-
   public AbfsOutputStreamContext withWriteMaxConcurrentRequestCount(
       final int writeMaxConcurrentRequestCount) {
     this.writeMaxConcurrentRequestCount = writeMaxConcurrentRequestCount;
@@ -218,36 +143,4 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
     }
     return this.lease.getLeaseID();
   }
-
-  public DataBlocks.BlockFactory getBlockFactory() {
-    return blockFactory;
-  }
-
-  public int getBlockOutputActiveBlocks() {
-    return blockOutputActiveBlocks;
-  }
-
-  public AbfsClient getClient() {
-    return client;
-  }
-
-  public FileSystem.Statistics getStatistics() {
-    return statistics;
-  }
-
-  public String getPath() {
-    return path;
-  }
-
-  public long getPosition() {
-    return position;
-  }
-
-  public ExecutorService getExecutorService() {
-    return executorService;
-  }
-
-  public TracingContext getTracingContext() {
-    return tracingContext;
-  }
 }

+ 1 - 3
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java

@@ -22,14 +22,12 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
-import org.apache.hadoop.fs.store.BlockUploadStatistics;
 
 /**
  * Interface for {@link AbfsOutputStream} statistics.
  */
 @InterfaceStability.Unstable
-public interface AbfsOutputStreamStatistics extends IOStatisticsSource,
-    BlockUploadStatistics {
+public interface AbfsOutputStreamStatistics extends IOStatisticsSource {
 
   /**
    * Number of bytes to be uploaded.

+ 1 - 24
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java

@@ -42,9 +42,7 @@ public class AbfsOutputStreamStatisticsImpl
           StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL,
           StreamStatisticNames.BYTES_UPLOAD_FAILED,
           StreamStatisticNames.QUEUE_SHRUNK_OPS,
-          StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS,
-          StreamStatisticNames.BLOCKS_ALLOCATED,
-          StreamStatisticNames.BLOCKS_RELEASED
+          StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS
       )
       .withDurationTracking(
           StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST,
@@ -62,11 +60,6 @@ public class AbfsOutputStreamStatisticsImpl
   private final AtomicLong writeCurrentBufferOps =
       ioStatisticsStore.getCounterReference(StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS);
 
-  private final AtomicLong blocksAllocated =
-      ioStatisticsStore.getCounterReference(StreamStatisticNames.BLOCKS_ALLOCATED);
-  private final AtomicLong blocksReleased =
-      ioStatisticsStore.getCounterReference(StreamStatisticNames.BLOCKS_RELEASED);
-
   /**
    * Records the need to upload bytes and increments the total bytes that
    * needs to be uploaded.
@@ -140,22 +133,6 @@ public class AbfsOutputStreamStatisticsImpl
     writeCurrentBufferOps.incrementAndGet();
   }
 
-  /**
-   * Increment the counter to indicate a block has been allocated.
-   */
-  @Override
-  public void blockAllocated() {
-    blocksAllocated.incrementAndGet();
-  }
-
-  /**
-   * Increment the counter to indicate a block has been released.
-   */
-  @Override
-  public void blockReleased() {
-    blocksReleased.incrementAndGet();
-  }
-
   /**
    * {@inheritDoc}
    *

+ 0 - 13
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestConstants.java

@@ -20,8 +20,6 @@ package org.apache.hadoop.fs.azure.integration;
 
 import org.apache.hadoop.fs.Path;
 
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
-
 /**
  * Constants for the Azure tests.
  */
@@ -177,15 +175,4 @@ public interface AzureTestConstants {
    * Base directory for page blobs.
    */
   Path PAGE_BLOB_DIR = new Path("/" + DEFAULT_PAGE_BLOB_DIRECTORY);
-
-  /**
-   * Huge file for testing AbfsOutputStream uploads: {@value}
-   */
-  String AZURE_SCALE_HUGE_FILE_UPLOAD = AZURE_SCALE_TEST + "huge.upload";
-
-  /**
-   * Default value for Huge file to be tested for AbfsOutputStream uploads:
-   * {@value}
-   */
-  int AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT = 2 * DEFAULT_WRITE_BUFFER_SIZE;
 }

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java

@@ -488,7 +488,7 @@ public abstract class AbstractAbfsIntegrationTest extends
    */
   protected AbfsOutputStream createAbfsOutputStreamWithFlushEnabled(
       AzureBlobFileSystem fs,
-      Path path) throws IOException {
+      Path path) throws AzureBlobFileSystemException {
     AzureBlobFileSystemStore abfss = fs.getAbfsStore();
     abfss.getAbfsConfiguration().setDisableOutputStreamFlush(false);
 

+ 1 - 16
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsScaleTest.java

@@ -24,10 +24,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
 
-import static org.apache.hadoop.fs.azure.integration.AzureTestConstants.AZURE_SCALE_HUGE_FILE_UPLOAD;
-import static org.apache.hadoop.fs.azure.integration.AzureTestConstants.AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT;
 import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled;
-import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.getTestPropertyInt;
 
 /**
  * Integration tests at bigger scale; configurable as to
@@ -37,7 +34,6 @@ public class AbstractAbfsScaleTest extends AbstractAbfsIntegrationTest  {
 
   protected static final Logger LOG =
       LoggerFactory.getLogger(AbstractAbfsScaleTest.class);
-  private static Configuration rawConfiguration;
 
   public AbstractAbfsScaleTest() throws Exception {
     super();
@@ -52,7 +48,7 @@ public class AbstractAbfsScaleTest extends AbstractAbfsIntegrationTest  {
   public void setup() throws Exception {
     super.setup();
     LOG.debug("Scale test operation count = {}", getOperationCount());
-    rawConfiguration = getRawConfiguration();
+    Configuration rawConfiguration = getRawConfiguration();
     assumeScaleTestsEnabled(rawConfiguration);
   }
 
@@ -60,15 +56,4 @@ public class AbstractAbfsScaleTest extends AbstractAbfsIntegrationTest  {
     return getConfiguration().getLong(AzureTestConstants.KEY_OPERATION_COUNT,
         AzureTestConstants.DEFAULT_OPERATION_COUNT);
   }
-
-
-  /**
-   * Method to get the Huge file for upload value for scale test.
-   * @return the huge value set.
-   */
-  public static int getHugeFileUploadValue() {
-    return getTestPropertyInt(rawConfiguration,
-        AZURE_SCALE_HUGE_FILE_UPLOAD,
-        AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT);
-  }
 }

+ 0 - 97
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsHugeFiles.java

@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Random;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-
-import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assume;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
-
-/**
- * Testing Huge file for AbfsOutputStream.
- */
-@RunWith(Parameterized.class)
-public class ITestAbfsHugeFiles extends AbstractAbfsScaleTest {
-  private static final int ONE_MB = 1024 * 1024;
-  private static final int EIGHT_MB = 8 * ONE_MB;
-  private final int size;
-
-  @Parameterized.Parameters(name = "Size={0}")
-  public static Iterable<Object[]> sizes() {
-    return Arrays.asList(new Object[][] {
-        { DEFAULT_WRITE_BUFFER_SIZE },
-        { getHugeFileUploadValue() } });
-  }
-
-  public ITestAbfsHugeFiles(int size) throws Exception {
-    this.size = size;
-  }
-
-  /**
-   * Testing Huge files written at once on AbfsOutputStream.
-   */
-  @Test
-  public void testHugeFileWrite() throws IOException {
-    AzureBlobFileSystem fs = getFileSystem();
-    Path filePath = path(getMethodName());
-    final byte[] b = new byte[size];
-    new Random().nextBytes(b);
-    try (FSDataOutputStream out = fs.create(filePath)) {
-      out.write(b);
-    }
-    // Verify correct length was uploaded. Don't want to verify contents
-    // here, as this would increase the test time significantly.
-    assertEquals("Mismatch in content length of file uploaded", size,
-        fs.getFileStatus(filePath).getLen());
-  }
-
-  /**
-   * Testing Huge files written in chunks of 8M in lots of writes.
-   */
-  @Test
-  public void testLotsOfWrites() throws IOException {
-    assume("If the size isn't a multiple of 8M this test would not pass, so "
-        + "skip", size % EIGHT_MB == 0);
-    AzureBlobFileSystem fs = getFileSystem();
-    Path filePath = path(getMethodName());
-    final byte[] b = new byte[size];
-    new Random().nextBytes(b);
-    try (FSDataOutputStream out = fs.create(filePath)) {
-      int offset = 0;
-      for (int i = 0; i < size / EIGHT_MB; i++) {
-        out.write(b, offset, EIGHT_MB);
-        offset += EIGHT_MB;
-      }
-    }
-    LOG.info(String.valueOf(size % EIGHT_MB));
-    // Verify correct length was uploaded. Don't want to verify contents
-    // here, as this would increase the test time significantly.
-    assertEquals("Mismatch in content length of file uploaded", size,
-        fs.getFileStatus(filePath).getLen());
-  }
-}

+ 7 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java

@@ -48,6 +48,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE
 import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
 import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE;
 import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_EXPIRED;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_NOT_PRESENT;
 import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_ID_SPECIFIED;
 import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS;
 import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED;
@@ -294,10 +295,15 @@ public class ITestAzureBlobFileSystemLease extends AbstractAbfsIntegrationTest {
     FSDataOutputStream out = fs.create(testFilePath);
     out.write(0);
     Assert.assertFalse("Store leases should exist", fs.getAbfsStore().areLeasesFreed());
-    out.close();
     fs.close();
     Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
 
+    LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_LEASE_NOT_PRESENT
+        : ERR_LEASE_EXPIRED, () -> {
+      out.close();
+      return "Expected exception on close after closed FS but got " + out;
+    });
+
     LambdaTestUtils.intercept(RejectedExecutionException.class, () -> {
       try (FSDataOutputStream out2 = fs.append(testFilePath)) {
       }

+ 35 - 141
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java

@@ -21,28 +21,18 @@ package org.apache.hadoop.fs.azurebfs.services;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 
 import org.mockito.ArgumentCaptor;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
-import org.apache.hadoop.fs.store.DataBlocks;
-import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
-import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
-
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
-import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
+
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.ArgumentMatchers.refEq;
@@ -68,27 +58,12 @@ public final class TestAbfsOutputStream {
   private final String accountKey1 = globalKey + "." + accountName1;
   private final String accountValue1 = "one";
 
-  private AbfsOutputStreamContext populateAbfsOutputStreamContext(
-      int writeBufferSize,
-      boolean isFlushEnabled,
-      boolean disableOutputStreamFlush,
-      boolean isAppendBlob,
-      AbfsClient client,
-      FileSystem.Statistics statistics,
-      String path,
-      TracingContext tracingContext,
-      ExecutorService executorService) throws IOException,
-      IllegalAccessException {
+  private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferSize,
+            boolean isFlushEnabled,
+            boolean disableOutputStreamFlush,
+            boolean isAppendBlob) throws IOException, IllegalAccessException {
     AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(),
         accountName1);
-    String blockFactoryName =
-        abfsConf.getRawConfiguration().getTrimmed(DATA_BLOCKS_BUFFER,
-        DATA_BLOCKS_BUFFER_DEFAULT);
-    DataBlocks.BlockFactory blockFactory =
-        DataBlocks.createFactory(FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR,
-            abfsConf.getRawConfiguration(),
-        blockFactoryName);
-
     return new AbfsOutputStreamContext(2)
             .withWriteBufferSize(writeBufferSize)
             .enableFlush(isFlushEnabled)
@@ -97,12 +72,6 @@ public final class TestAbfsOutputStream {
             .withAppendBlob(isAppendBlob)
             .withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount())
             .withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue())
-            .withClient(client)
-            .withPath(path)
-            .withFsStatistics(statistics)
-            .withTracingContext(tracingContext)
-            .withExecutorService(executorService)
-            .withBlockFactory(blockFactory)
             .build();
   }
 
@@ -126,19 +95,11 @@ public final class TestAbfsOutputStream {
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(),
         isNull(), any(TracingContext.class))).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(
-        populateAbfsOutputStreamContext(
-            BUFFER_SIZE,
-            true,
-            false,
-            false,
-            client,
-            null,
-            PATH,
-            new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
-                FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
-                null),
-            createExecutorService(abfsConf)));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false),
+        new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
+        FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
+        null));
     final byte[] b = new byte[WRITE_SIZE];
     new Random().nextBytes(b);
     out.write(b);
@@ -188,17 +149,9 @@ public final class TestAbfsOutputStream {
     when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), any(TracingContext.class))).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(TracingContext.class))).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(
-        populateAbfsOutputStreamContext(
-            BUFFER_SIZE,
-            true,
-            false,
-            false,
-            client,
-            null,
-            PATH,
-            tracingContext,
-            createExecutorService(abfsConf)));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false),
+        tracingContext);
     final byte[] b = new byte[WRITE_SIZE];
     new Random().nextBytes(b);
 
@@ -263,17 +216,9 @@ public final class TestAbfsOutputStream {
     when(op.getSasToken()).thenReturn("testToken");
     when(op.getResult()).thenReturn(httpOp);
 
-    AbfsOutputStream out = new AbfsOutputStream(
-        populateAbfsOutputStreamContext(
-            BUFFER_SIZE,
-            true,
-            false,
-            false,
-            client,
-            null,
-            PATH,
-            tracingContext,
-            createExecutorService(abfsConf)));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false),
+        tracingContext);
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -335,19 +280,11 @@ public final class TestAbfsOutputStream {
     when(op.getSasToken()).thenReturn("testToken");
     when(op.getResult()).thenReturn(httpOp);
 
-    AbfsOutputStream out = new AbfsOutputStream(
-        populateAbfsOutputStreamContext(
-            BUFFER_SIZE,
-            true,
-            false,
-            false,
-            client,
-            null,
-            PATH,
-            new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
-                FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
-                null),
-            createExecutorService(abfsConf)));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false),
+        new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
+            FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
+            null));
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -391,19 +328,11 @@ public final class TestAbfsOutputStream {
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(),
         isNull(), any(TracingContext.class))).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(
-        populateAbfsOutputStreamContext(
-            BUFFER_SIZE,
-            true,
-            false,
-            true,
-            client,
-            null,
-            PATH,
-            new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
-                FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(),
-                null),
-            createExecutorService(abfsConf)));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true),
+        new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
+            FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(),
+            null));
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -451,19 +380,10 @@ public final class TestAbfsOutputStream {
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(),
         isNull(), any(TracingContext.class))).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(
-        populateAbfsOutputStreamContext(
-            BUFFER_SIZE,
-            true,
-            false,
-            false,
-            client,
-            null,
-            PATH,
-            new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
-                FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(),
-                null),
-            createExecutorService(abfsConf)));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false), new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
+        FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(),
+        null));
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -521,19 +441,11 @@ public final class TestAbfsOutputStream {
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(),
         isNull(), any(TracingContext.class))).thenReturn(op);
 
-    AbfsOutputStream out = new AbfsOutputStream(
-        populateAbfsOutputStreamContext(
-            BUFFER_SIZE,
-            true,
-            false,
-            false,
-            client,
-            null,
-            PATH,
-            new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
-                FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
-                null),
-            createExecutorService(abfsConf)));
+    AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
+        populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false),
+        new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
+            FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
+            null));
     final byte[] b = new byte[BUFFER_SIZE];
     new Random().nextBytes(b);
 
@@ -557,22 +469,4 @@ public final class TestAbfsOutputStream {
     verify(client, times(2)).append(
         eq(PATH), any(byte[].class), any(), any(), any(TracingContext.class));
   }
-
-  /**
-   * Method to create an executor Service for AbfsOutputStream.
-   * @param abfsConf Configuration.
-   * @return ExecutorService.
-   */
-  private ExecutorService createExecutorService(
-      AbfsConfiguration abfsConf) {
-    ExecutorService executorService =
-        new SemaphoredDelegatingExecutor(BlockingThreadPoolExecutorService.newInstance(
-            abfsConf.getWriteMaxConcurrentRequestCount(),
-            abfsConf.getMaxWriteRequestsToQueue(),
-            10L, TimeUnit.SECONDS,
-            "abfs-test-bounded"),
-            BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT, true);
-    return executorService;
-  }
-
 }