Browse Source

HADOOP-13403. AzureNativeFileSystem rename/delete performance improvements. Contributed by Subramanyam Pattipaka.

Chris Nauroth 8 years ago
parent
commit
2ed58c40e5

+ 346 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureFileSystemThreadPoolExecutor.java

@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+class AzureFileSystemThreadPoolExecutor {
+
+  public static final Logger LOG = LoggerFactory.getLogger(AzureFileSystemThreadPoolExecutor.class);
+
+  /*
+   * Number of threads to keep in the pool.
+   */
+  private int threadCount;
+
+  /*
+   * Prefix to be used for naming threads.
+   */
+  private String threadNamePrefix;
+
+  /*
+   * File system operation like delete/rename. Used for logging purpose.
+   */
+  private String operation;
+
+  /*
+   * Source blob key used for file operation. Used for logging purpose.
+   */
+  private String key;
+
+  /*
+   * Configuration name for recommendations. Used for logging purpose.
+   */
+  private String config;
+
+  /**
+   * Creates a new AzureFileSystemThreadPoolExecutor object.
+   *
+   * @param threadCount
+   *        Number of threads to be used after reading user configuration.
+   * @param threadNamePrefix
+   *        Prefix to be used to name threads for the file operation.
+   * @param operation
+   *        File system operation like delete/rename. Used for logging purpose.
+   * @param key
+   *        Source blob key used for file operation. Used for logging purpose.
+   * @param config
+   *        Configuration name for recommendations. Used for logging purpose.
+   */
+  public AzureFileSystemThreadPoolExecutor(int threadCount, String threadNamePrefix,
+      String operation, String key, String config) {
+    this.threadCount = threadCount;
+    this.threadNamePrefix = threadNamePrefix;
+    this.operation = operation;
+    this.key = key;
+    this.config = config;
+  }
+
+  /**
+   * Gets a new thread pool
+   * @param threadCount
+   *        Number of threads to keep in the pool.
+   * @param threadNamePrefix
+   *        Prefix to be used for naming threads.
+   *
+   * @return
+   *        Returns a new thread pool.
+   */
+  @VisibleForTesting
+  ThreadPoolExecutor getThreadPool(int threadCount) throws Exception {
+    return new ThreadPoolExecutor(threadCount, threadCount, 2, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>(), new AzureFileSystemThreadFactory(this.threadNamePrefix));
+  }
+
+  /**
+   * Execute the file operation parallel using threads. All threads works on a
+   * single working set of files stored in input 'contents'. The synchronization
+   * between multiple threads is achieved through retrieving atomic index value
+   * from the array. Once thread gets the index, it retrieves the file and initiates
+   * the file operation. The advantage with this method is that file operations
+   * doesn't get serialized due to any thread. Also, the input copy is not changed
+   * such that caller can reuse the list for other purposes.
+   *
+   * This implementation also considers that failure of operation on single file
+   * is considered as overall operation failure. All threads bail out their execution
+   * as soon as they detect any single thread either got exception or operation is failed.
+   *
+   * @param contents
+   *        List of blobs on which operation to be done.
+   * @param threadOperation
+   *        The actual operation to be executed by each thread on a file.
+   *
+   * @param operationStatus
+   *        Returns true if the operation is success, false if operation is failed.
+   * @throws IOException
+   *
+   */
+  boolean executeParallel(FileMetadata[] contents, AzureFileSystemThreadTask threadOperation) throws IOException {
+
+    boolean operationStatus = false;
+    boolean threadsEnabled = false;
+    int threadCount = this.threadCount;
+    ThreadPoolExecutor ioThreadPool = null;
+
+    // Start time for file operation
+    long start = Time.monotonicNow();
+
+    // If number of files  are less then reduce threads to file count.
+    threadCount = Math.min(contents.length, threadCount);
+
+    if (threadCount > 1) {
+      try {
+        ioThreadPool = getThreadPool(threadCount);
+        threadsEnabled = true;
+      } catch(Exception e) {
+        // The possibility of this scenario is very remote. Added this code as safety net.
+        LOG.warn("Failed to create thread pool with threads {} for operation {} on blob {}."
+            + " Use config {} to set less number of threads. Setting config value to <= 1 will disable threads.",
+            threadCount, operation, key, config);
+      }
+    } else {
+      LOG.warn("Disabling threads for {} operation as thread count {} is <= 1", operation, threadCount);
+    }
+
+    if (threadsEnabled) {
+      LOG.debug("Using thread pool for {} operation with threads {}", operation, threadCount);
+      boolean started = false;
+      AzureFileSystemThreadRunnable runnable = new AzureFileSystemThreadRunnable(contents, threadOperation, operation);
+
+      // Don't start any new requests if there is an exception from any one thread.
+      for (int i = 0; i < threadCount && runnable.lastException == null && runnable.operationStatus; i++)
+      {
+        try {
+          ioThreadPool.execute(runnable);
+          started = true;
+        } catch (RejectedExecutionException ex) {
+          // If threads can't be scheduled then report error and move ahead with next thread.
+          // Don't fail operation due to this issue.
+          LOG.error("Rejected execution of thread for {} operation on blob {}."
+              + " Continuing with existing threads. Use config {} to set less number of threads"
+              + " to avoid this error", operation, key, config);
+        }
+      }
+
+      // Stop accepting any new execute requests.
+      ioThreadPool.shutdown();
+
+      try {
+        // Wait for threads to terminate. Keep time out as large value
+        ioThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+      } catch(InterruptedException intrEx) {
+        // If current thread got interrupted then shutdown all threads now.
+        ioThreadPool.shutdownNow();
+
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        LOG.error("Threads got interrupted {} blob operation for {} "
+            , operation, key);
+      }
+
+      int threadsNotUsed = threadCount - runnable.threadsUsed.get();
+      if (threadsNotUsed > 0) {
+        LOG.warn("{} threads not used for {} operation on blob {}", threadsNotUsed, operation, key);
+      }
+
+      if (!started) {
+        // No threads started. Fall back to serial mode.
+        threadsEnabled = false;
+        LOG.info("Not able to schedule threads to {} blob {}. Fall back to {} blob serially."
+            , operation, key, operation);
+      } else {
+        IOException lastException = runnable.lastException;
+
+        // There are no exceptions from threads and no operation failures. Consider this scenario
+        // as failure only if file operations are not done on all files.
+        if (lastException == null && runnable.operationStatus && runnable.filesProcessed.get() < contents.length) {
+          LOG.error("{} failed as operation on subfolders and files failed.", operation);
+          lastException = new IOException(operation + " failed as operation on subfolders and files failed.");
+        }
+
+        if (lastException != null) {
+          // Threads started and executed. One or more threads seems to have hit exception.
+          // Raise the same exception.
+          throw lastException;
+        }
+
+        operationStatus = runnable.operationStatus;
+      }
+    }
+
+    if (!threadsEnabled) {
+      // No threads. Serialize the operation. Clear any last exceptions.
+      LOG.debug("Serializing the {} operation", operation);
+      for (int i = 0; i < contents.length; i++) {
+        if (!threadOperation.execute(contents[i])) {
+          LOG.warn("Failed to {} file {}", operation, contents[i]);
+          return false;
+        }
+      }
+
+      // Operation is success
+      operationStatus = true;
+    }
+
+    // Find the duration of time taken for file operation
+    long end = Time.monotonicNow();
+    LOG.info("Time taken for {} operation is: {} ms with threads: {}", operation, (end - start), threadCount);
+
+    return operationStatus;
+  }
+
+  /**
+   * A ThreadFactory for Azure File operation threads with meaningful names helpful
+   * for debugging purposes.
+   */
+  static class AzureFileSystemThreadFactory implements ThreadFactory {
+
+    private String threadIdPrefix = "AzureFileSystemThread";
+
+    /**
+     * Atomic integer to provide thread id for thread names.
+     */
+    private AtomicInteger threadSequenceNumber = new AtomicInteger(0);
+
+    public AzureFileSystemThreadFactory(String prefix) {
+      threadIdPrefix = prefix;
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+      Thread t = new Thread(r);
+
+      // Use current thread name as part in naming thread such that use of
+      // same file system object will have unique names.
+      t.setName(String.format("%s-%s-%d", threadIdPrefix, Thread.currentThread().getName(),
+          threadSequenceNumber.getAndIncrement()));
+      return t;
+    }
+
+  }
+
+  static class AzureFileSystemThreadRunnable implements Runnable {
+
+    // Tracks if any thread has raised exception.
+    private volatile IOException lastException = null;
+
+    // Tracks if any thread has failed execution.
+    private volatile boolean operationStatus = true;
+
+    // Atomic tracker to retrieve index of next file to be processed
+    private AtomicInteger fileIndex = new AtomicInteger(0);
+
+    // Atomic tracker to count number of files successfully processed
+    private AtomicInteger filesProcessed = new AtomicInteger(0);
+
+    // Atomic tracker to retrieve number of threads used to do at least one file operation.
+    private AtomicInteger threadsUsed = new AtomicInteger(0);
+
+    // Type of file system operation
+    private String operation = "Unknown";
+
+    // List of files to be processed.
+    private final FileMetadata[] files;
+
+    // Thread task which encapsulates the file system operation work on a file.
+    private AzureFileSystemThreadTask task;
+
+    public AzureFileSystemThreadRunnable(final FileMetadata[] files,
+        AzureFileSystemThreadTask task, String operation) {
+      this.operation = operation;
+      this.files = files;
+      this.task = task;
+    }
+
+    @Override
+    public void run() {
+      long start = Time.monotonicNow();
+      int currentIndex;
+      int processedFilesCount = 0;
+
+      while ((currentIndex = fileIndex.getAndIncrement()) < files.length) {
+        processedFilesCount++;
+        FileMetadata file = files[currentIndex];
+
+        try {
+          // Execute the file operation.
+          if (!task.execute(file)) {
+            LOG.error("{} operation failed for file {}",
+                this.operation, file.getKey());
+            operationStatus = false;
+          } else {
+            filesProcessed.getAndIncrement();
+          }
+        } catch (Exception e) {
+          LOG.error("Encountered Exception for {} operation for file {}",
+              this.operation, file.getKey());
+          lastException = new IOException("Encountered Exception for "
+              + this.operation + " operation for file " + file.getKey(), e);
+        }
+
+        // If any thread has seen exception or operation failed then we
+        // don't have to process further.
+        if (lastException != null || !operationStatus) {
+          LOG.warn("Terminating execution of {} operation now as some other thread"
+              + " already got exception or operation failed", this.operation, file.getKey());
+          break;
+        }
+      }
+
+      long end = Time.monotonicNow();
+      LOG.debug("Time taken to process {} files count for {} operation: {} ms",
+          processedFilesCount, this.operation, (end - start));
+      if (processedFilesCount > 0) {
+        threadsUsed.getAndIncrement();
+      }
+    }
+  }
+}

+ 29 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureFileSystemThreadTask.java

@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.io.IOException;
+
+/**
+ * Interface for executing the file operation by a thread.
+ */
+public interface AzureFileSystemThreadTask {
+  // Execute the operation on the file.
+  boolean execute(FileMetadata file) throws IOException;
+}

+ 39 - 14
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java

@@ -181,6 +181,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   public static final String KEY_ATOMIC_RENAME_DIRECTORIES =
       "fs.azure.atomic.rename.dir";
 
+  /**
+   * Configuration key to enable flat listing of blobs. This config is useful
+   * only if listing depth is AZURE_UNBOUNDED_DEPTH.
+   */
+  public static final String KEY_ENABLE_FLAT_LISTING = "fs.azure.flatlist.enable";
+
   /**
    * The set of directories where we should apply atomic folder rename
    * synchronized with createNonRecursive.
@@ -224,6 +230,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
   private static final int STORAGE_CONNECTION_TIMEOUT_DEFAULT = 90;
 
+  /**
+   * Enable flat listing of blobs as default option. This is useful only if
+   * listing depth is AZURE_UNBOUNDED_DEPTH.
+   */
+  public static final boolean DEFAULT_ENABLE_FLAT_LISTING = false;
 
   /**
    * MEMBER VARIABLES
@@ -1614,15 +1625,17 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
    * @param includeMetadata
    *          if set, the listed items will have their metadata populated
    *          already.
-   * 
+   * @param useFlatBlobListing
+   *          if set the list is flat, otherwise it is hierarchical.
+   *
    * @returns blobItems : iterable collection of blob items.
    * @throws URISyntaxException
    * 
    */
-  private Iterable<ListBlobItem> listRootBlobs(boolean includeMetadata)
-      throws StorageException, URISyntaxException {
+  private Iterable<ListBlobItem> listRootBlobs(boolean includeMetadata,
+      boolean useFlatBlobListing) throws StorageException, URISyntaxException {
     return rootDirectory.listBlobs(
-        null, false,
+        null, useFlatBlobListing,
         includeMetadata ?
             EnumSet.of(BlobListingDetails.METADATA) :
               EnumSet.noneOf(BlobListingDetails.class),
@@ -1642,16 +1655,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
    * @param includeMetadata
    *          if set, the listed items will have their metadata populated
    *          already.
+   * @param useFlatBlobListing
+   *          if set the list is flat, otherwise it is hierarchical.
    * 
    * @returns blobItems : iterable collection of blob items.
    * @throws URISyntaxException
    * 
    */
-  private Iterable<ListBlobItem> listRootBlobs(String aPrefix,
-      boolean includeMetadata) throws StorageException, URISyntaxException {
+  private Iterable<ListBlobItem> listRootBlobs(String aPrefix, boolean includeMetadata,
+      boolean useFlatBlobListing) throws StorageException, URISyntaxException {
 
     Iterable<ListBlobItem> list = rootDirectory.listBlobs(aPrefix,
-        false,
+        useFlatBlobListing,
         includeMetadata ?
             EnumSet.of(BlobListingDetails.METADATA) :
               EnumSet.noneOf(BlobListingDetails.class),
@@ -2049,11 +2064,19 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         prefix += PATH_DELIMITER;
       }
 
+      // Enable flat listing option only if depth is unbounded and config
+      // KEY_ENABLE_FLAT_LISTING is enabled.
+      boolean enableFlatListing = false;
+      if (maxListingDepth < 0 && sessionConfiguration.getBoolean(
+        KEY_ENABLE_FLAT_LISTING, DEFAULT_ENABLE_FLAT_LISTING)) {
+        enableFlatListing = true;
+      }
+
       Iterable<ListBlobItem> objects;
       if (prefix.equals("/")) {
-        objects = listRootBlobs(true);
+        objects = listRootBlobs(true, enableFlatListing);
       } else {
-        objects = listRootBlobs(prefix, true);
+        objects = listRootBlobs(prefix, true, enableFlatListing);
       }
 
       ArrayList<FileMetadata> fileMetadata = new ArrayList<FileMetadata>();
@@ -2121,10 +2144,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
             fileMetadata.add(directoryMetadata);
           }
 
-          // Currently at a depth of one, decrement the listing depth for
-          // sub-directories.
-          buildUpList(directory, fileMetadata, maxListingCount,
-              maxListingDepth - 1);
+          if (!enableFlatListing) {
+            // Currently at a depth of one, decrement the listing depth for
+            // sub-directories.
+            buildUpList(directory, fileMetadata, maxListingCount,
+                maxListingDepth - 1);
+          }
         }
       }
       // Note: Original code indicated that this may be a hack.
@@ -2632,7 +2657,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       }
       // Get all blob items with the given prefix from the container and delete
       // them.
-      Iterable<ListBlobItem> objects = listRootBlobs(prefix, false);
+      Iterable<ListBlobItem> objects = listRootBlobs(prefix, false, false);
       for (ListBlobItem blobItem : objects) {
         ((CloudBlob) blobItem).delete(DeleteSnapshotsOption.NONE, null, null,
             getInstrumentedContext());

+ 151 - 84
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java

@@ -59,23 +59,21 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
 import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.fs.azure.AzureException;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.util.Time;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.microsoft.azure.storage.StorageException;
 
-
-import org.apache.hadoop.io.IOUtils;
-
 /**
  * A {@link FileSystem} for reading and writing files stored on <a
  * href="http://store.azure.com/">Windows Azure</a>. This implementation is
@@ -90,6 +88,7 @@ public class NativeAzureFileSystem extends FileSystem {
    * A description of a folder rename operation, including the source and
    * destination keys, and descriptions of the files in the source folder.
    */
+
   public static class FolderRenamePending {
     private SelfRenewingLease folderLease;
     private String srcKey;
@@ -112,6 +111,7 @@ public class NativeAzureFileSystem extends FileSystem {
       ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
 
       // List all the files in the folder.
+      long start = Time.monotonicNow();
       String priorLastKey = null;
       do {
         PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL,
@@ -122,6 +122,9 @@ public class NativeAzureFileSystem extends FileSystem {
         priorLastKey = listing.getPriorLastKey();
       } while (priorLastKey != null);
       fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
+      long end = Time.monotonicNow();
+      LOG.debug("Time taken to list {} blobs for rename operation is: {} ms", fileMetadata.length, (end - start));
+
       this.committed = true;
     }
 
@@ -419,23 +422,18 @@ public class NativeAzureFileSystem extends FileSystem {
      */
     public void execute() throws IOException {
 
-      for (FileMetadata file : this.getFiles()) {
-
-        // Rename all materialized entries under the folder to point to the
-        // final destination.
-        if (file.getBlobMaterialization() == BlobMaterialization.Explicit) {
-          String srcName = file.getKey();
-          String suffix  = srcName.substring((this.getSrcKey()).length());
-          String dstName = this.getDstKey() + suffix;
-
-          // Rename gets exclusive access (via a lease) for files
-          // designated for atomic rename.
-          // The main use case is for HBase write-ahead log (WAL) and data
-          // folder processing correctness.  See the rename code for details.
-          boolean acquireLease = fs.getStoreInterface().isAtomicRenameKey(srcName);
-          fs.getStoreInterface().rename(srcName, dstName, acquireLease, null);
+      AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
+        @Override
+        public boolean execute(FileMetadata file) throws IOException{
+          renameFile(file);
+          return true;
         }
-      }
+      };
+
+      AzureFileSystemThreadPoolExecutor executor = this.fs.getThreadPoolExecutor(this.fs.renameThreadCount,
+          "AzureBlobRenameThread", "Rename", getSrcKey(), AZURE_RENAME_THREADS);
+
+      executor.executeParallel(this.getFiles(), task);
 
       // Rename the source folder 0-byte root file itself.
       FileMetadata srcMetadata2 = this.getSourceMetadata();
@@ -454,6 +452,25 @@ public class NativeAzureFileSystem extends FileSystem {
       fs.updateParentFolderLastModifiedTime(dstKey);
     }
 
+    // Rename a single file
+    @VisibleForTesting
+    void renameFile(FileMetadata file) throws IOException{
+      // Rename all materialized entries under the folder to point to the
+      // final destination.
+      if (file.getBlobMaterialization() == BlobMaterialization.Explicit) {
+        String srcName = file.getKey();
+        String suffix  = srcName.substring((this.getSrcKey()).length());
+        String dstName = this.getDstKey() + suffix;
+
+        // Rename gets exclusive access (via a lease) for files
+        // designated for atomic rename.
+        // The main use case is for HBase write-ahead log (WAL) and data
+        // folder processing correctness.  See the rename code for details.
+        boolean acquireLease = this.fs.getStoreInterface().isAtomicRenameKey(srcName);
+        this.fs.getStoreInterface().rename(srcName, dstName, acquireLease, null);
+      }
+    }
+
     /** Clean up after execution of rename.
      * @throws IOException */
     public void cleanup() throws IOException {
@@ -662,6 +679,36 @@ public class NativeAzureFileSystem extends FileSystem {
    */
   public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support";
 
+  /**
+   * The configuration property to set number of threads to be used for rename operation.
+   */
+  public static final String AZURE_RENAME_THREADS = "fs.azure.rename.threads";
+
+  /**
+   * The default number of threads to be used for rename operation.
+   */
+  public static final int DEFAULT_AZURE_RENAME_THREADS = 0;
+
+  /**
+   * The configuration property to set number of threads to be used for delete operation.
+   */
+  public static final String AZURE_DELETE_THREADS = "fs.azure.delete.threads";
+
+  /**
+   * The default number of threads to be used for delete operation.
+   */
+  public static final int DEFAULT_AZURE_DELETE_THREADS = 0;
+
+  /**
+   * The number of threads to be used for delete operation after reading user configuration.
+   */
+  private int deleteThreadCount = 0;
+
+  /**
+   * The number of threads to be used for rename operation after reading user configuration.
+   */
+  private int renameThreadCount = 0;
+
   private class NativeAzureFsInputStream extends FSInputStream {
     private InputStream in;
     private final String key;
@@ -1172,6 +1219,9 @@ public class NativeAzureFileSystem extends FileSystem {
     LOG.debug("  blockSize  = {}",
         conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
 
+    // Initialize thread counts from user configuration
+    deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS);
+    renameThreadCount = conf.getInt(AZURE_RENAME_THREADS, DEFAULT_AZURE_RENAME_THREADS);
   }
 
   private NativeFileSystemStore createDefaultStore(Configuration conf) {
@@ -1779,77 +1829,65 @@ public class NativeAzureFileSystem extends FileSystem {
 
       // List all the blobs in the current folder.
       String priorLastKey = null;
-      PartialListing listing = null;
-      try {
-        listing = store.listAll(key, AZURE_LIST_ALL, 1,
-            priorLastKey);
-      } catch(IOException e) {
-
-        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
-
-        if (innerException instanceof StorageException
-            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
-          return false;
-        }
-
-        throw e;
-      }
-
-      if (listing == null) {
-        return false;
-      }
 
-      FileMetadata[] contents = listing.getFiles();
-      if (!recursive && contents.length > 0) {
-        // The folder is non-empty and recursive delete was not specified.
-        // Throw an exception indicating that a non-recursive delete was
-        // specified for a non-empty folder.
-        throw new IOException("Non-recursive delete of non-empty directory "
-            + f.toString());
-      }
-
-      // Delete all the files in the folder.
-      for (FileMetadata p : contents) {
-        // Tag on the directory name found as the suffix of the suffix of the
-        // parent directory to get the new absolute path.
-        String suffix = p.getKey().substring(
-            p.getKey().lastIndexOf(PATH_DELIMITER));
-        if (!p.isDir()) {
-          try {
-            store.delete(key + suffix);
-            instrumentation.fileDeleted();
-          } catch(IOException e) {
-
-            Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
-
-            if (innerException instanceof StorageException
-                && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
-              return false;
-            }
+      // Start time for list operation
+      long start = Time.monotonicNow();
+      ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
 
-            throw e;
+      // List all the files in the folder with AZURE_UNBOUNDED_DEPTH depth.
+      do {
+        try {
+          PartialListing listing = store.listAll(key, AZURE_LIST_ALL,
+            AZURE_UNBOUNDED_DEPTH, priorLastKey);
+          for(FileMetadata file : listing.getFiles()) {
+            fileMetadataList.add(file);
           }
-        } else {
-          // Recursively delete contents of the sub-folders. Notice this also
-          // deletes the blob for the directory.
-          if (!delete(new Path(f.toString() + suffix), true)) {
+          priorLastKey = listing.getPriorLastKey();
+        } catch (IOException e) {
+          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
+
+          if (innerException instanceof StorageException
+              && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
             return false;
           }
+
+          throw e;
         }
-      }
+      } while (priorLastKey != null);
 
-      try {
-        store.delete(key);
-      } catch(IOException e) {
+      long end = Time.monotonicNow();
+      LOG.debug("Time taken to list {} blobs for delete operation: {} ms", fileMetadataList.size(), (end - start));
 
-        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
+      final FileMetadata[] contents = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
 
-        if (innerException instanceof StorageException
-            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
-          return false;
+      if (!recursive && contents.length > 0) {
+          // The folder is non-empty and recursive delete was not specified.
+          // Throw an exception indicating that a non-recursive delete was
+          // specified for a non-empty folder.
+          throw new IOException("Non-recursive delete of non-empty directory "
+              + f.toString());
+      }
+
+      // Delete all files / folders in current directory stored as list in 'contents'.
+      AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
+        @Override
+        public boolean execute(FileMetadata file) throws IOException{
+          return deleteFile(file.getKey(), file.isDir());
         }
+      };
 
-        throw e;
+      AzureFileSystemThreadPoolExecutor executor = getThreadPoolExecutor(this.deleteThreadCount,
+          "AzureBlobDeleteThread", "Delete", key, AZURE_DELETE_THREADS);
+
+      if (!executor.executeParallel(contents, task)) {
+        LOG.error("Failed to delete files / subfolders in blob {}", key);
+        return false;
+      }
+
+      // Delete the current directory
+      if (!deleteFile(metaFile.getKey(), metaFile.isDir())) {
+        LOG.error("Failed delete directory {}", f.toString());
+        return false;
       }
 
       // Update parent directory last modified time
@@ -1859,7 +1897,6 @@ public class NativeAzureFileSystem extends FileSystem {
           updateParentFolderLastModifiedTime(key);
         }
       }
-      instrumentation.directoryDeleted();
     }
 
     // File or directory was successfully deleted.
@@ -1867,6 +1904,35 @@ public class NativeAzureFileSystem extends FileSystem {
     return true;
   }
 
+  public AzureFileSystemThreadPoolExecutor getThreadPoolExecutor(int threadCount,
+      String threadNamePrefix, String operation, String key, String config) {
+    return new AzureFileSystemThreadPoolExecutor(threadCount, threadNamePrefix, operation, key, config);
+  }
+
+  // Delete single file / directory from key.
+  @VisibleForTesting
+  boolean deleteFile(String key, boolean isDir) throws IOException {
+    try {
+      store.delete(key);
+      if (isDir) {
+        instrumentation.directoryDeleted();
+      } else {
+        instrumentation.fileDeleted();
+      }
+    } catch(IOException e) {
+      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
+
+      if (innerException instanceof StorageException
+          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+        return false;
+      }
+
+      throw e;
+    }
+
+    return true;
+  }
+
   @Override
   public FileStatus getFileStatus(Path f) throws FileNotFoundException, IOException {
 
@@ -2517,7 +2583,8 @@ public class NativeAzureFileSystem extends FileSystem {
    * @param dstKey Destination folder name.
    * @throws IOException
    */
-  private FolderRenamePending prepareAtomicFolderRename(
+  @VisibleForTesting
+  FolderRenamePending prepareAtomicFolderRename(
       String srcKey, String dstKey) throws IOException {
 
     if (store.isAtomicRenameKey(srcKey)) {

+ 19 - 0
hadoop-tools/hadoop-azure/src/site/markdown/index.md

@@ -24,6 +24,7 @@
     * [Atomic Folder Rename](#Atomic_Folder_Rename)
     * [Accessing wasb URLs](#Accessing_wasb_URLs)
     * [Append API Support and Configuration](#Append_API_Support_and_Configuration)
+    * [Multithread Support](#Multithread_Support)
 * [Testing the hadoop-azure Module](#Testing_the_hadoop-azure_Module)
 
 ## <a name="Introduction" />Introduction
@@ -266,6 +267,24 @@ It becomes a responsibility of the application either to ensure single-threaded
 file path, or rely on some external locking mechanism of its own.  Failure to do so will result in
 unexpected behavior.
 
+### <a name="Multithread_Support" />Multithread Support
+
+Rename and Delete blob operations on directories with large number of files and sub directories currently is very slow as these operations are done one blob at a time serially. These files and sub folders can be deleted or renamed parallel. Following configurations can be used to enable threads to do parallel processing
+
+To enable 10 threads for Delete operation. Set configuration value to 0 or 1 to disable threads. The default behavior is threads disabled.
+
+    <property>
+      <name>fs.azure.delete.threads</name>
+      <value>10</value>
+    </property>
+
+To enable 20 threads for Rename operation. Set configuration value to 0 or 1 to disable threads. The default behavior is threads disabled.
+
+    <property>
+      <name>fs.azure.rename.threads</name>
+      <value>20</value>
+    </property>
+
 ## <a name="Testing_the_hadoop-azure_Module" />Testing the hadoop-azure Module
 
 The hadoop-azure module includes a full suite of unit tests.  Most of the tests

+ 720 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestFileSystemOperationsWithThreads.java

@@ -0,0 +1,720 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.apache.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+/**
+ * Tests the Native Azure file system (WASB) using parallel threads for rename and delete operations.
+ */
+public class TestFileSystemOperationsWithThreads extends AbstractWasbTestBase {
+
+  private final int renameThreads = 10;
+  private final int deleteThreads = 20;
+  private int iterations = 1;
+  private LogCapturer logs = null;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = fs.getConf();
+
+    // By default enable parallel threads for rename and delete operations.
+    // Also enable flat listing of blobs for these operations.
+    conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, renameThreads);
+    conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, deleteThreads);
+    conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, true);
+
+    URI uri = fs.getUri();
+    fs.initialize(uri, conf);
+
+    // Capture logs
+    logs = LogCapturer.captureLogs(new Log4JLogger(Logger
+        .getRootLogger()));
+  }
+
+  /*
+   * Helper method to create sub directory and different types of files
+   * for multiple iterations.
+   */
+  private void createFolder(FileSystem fs, String root) throws Exception {
+    fs.mkdirs(new Path(root));
+    for (int i = 0; i < this.iterations; i++) {
+      fs.mkdirs(new Path(root + "/" + i));
+      fs.createNewFile(new Path(root + "/" + i + "/fileToRename"));
+      fs.createNewFile(new Path(root + "/" + i + "/file/to/rename"));
+      fs.createNewFile(new Path(root + "/" + i + "/file+to%rename"));
+      fs.createNewFile(new Path(root + "/fileToRename" + i));
+    }
+  }
+
+  /*
+   * Helper method to do rename operation and validate all files in source folder
+   * doesn't exists and similar files exists in new folder.
+   */
+  private void validateRenameFolder(FileSystem fs, String source, String dest) throws Exception {
+    // Create source folder with files.
+    createFolder(fs, source);
+    Path sourceFolder = new Path(source);
+    Path destFolder = new Path(dest);
+
+    // rename operation
+    assertTrue(fs.rename(sourceFolder, destFolder));
+    assertTrue(fs.exists(destFolder));
+
+    for (int i = 0; i < this.iterations; i++) {
+      // Check destination folder and files exists.
+      assertTrue(fs.exists(new Path(dest + "/" + i)));
+      assertTrue(fs.exists(new Path(dest + "/" + i + "/fileToRename")));
+      assertTrue(fs.exists(new Path(dest + "/" + i + "/file/to/rename")));
+      assertTrue(fs.exists(new Path(dest + "/" + i + "/file+to%rename")));
+      assertTrue(fs.exists(new Path(dest + "/fileToRename" + i)));
+
+      // Check source folder and files doesn't exists.
+      assertFalse(fs.exists(new Path(source + "/" + i)));
+      assertFalse(fs.exists(new Path(source + "/" + i + "/fileToRename")));
+      assertFalse(fs.exists(new Path(source + "/" + i + "/file/to/rename")));
+      assertFalse(fs.exists(new Path(source + "/" + i + "/file+to%rename")));
+      assertFalse(fs.exists(new Path(source + "/fileToRename" + i)));
+    }
+  }
+
+  /*
+   * Test case for rename operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testRenameSmallFolderWithThreads() throws Exception {
+
+    validateRenameFolder(fs, "root", "rootnew");
+
+    // With single iteration, we would have created 7 blobs.
+    int expectedThreadsCreated = Math.min(7, renameThreads);
+
+    // Validate from logs that threads are created.
+    String content = logs.getOutput();
+    assertTrue(content.contains("ms with threads: " + expectedThreadsCreated));
+
+    // Validate thread executions
+    for (int i = 0; i < expectedThreadsCreated; i++) {
+      assertTrue(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i));
+    }
+
+    // Also ensure that we haven't spawned extra threads.
+    if (expectedThreadsCreated < renameThreads) {
+      for (int i = expectedThreadsCreated; i < renameThreads; i++) {
+        assertFalse(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i));
+      }
+    }
+  }
+
+  /*
+   * Test case for rename operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testRenameLargeFolderWithThreads() throws Exception {
+
+    // Populate source folder with large number of files and directories.
+    this.iterations = 10;
+    validateRenameFolder(fs, "root", "rootnew");
+
+    // Validate from logs that threads are created.
+    String content = logs.getOutput();
+    assertTrue(content.contains("ms with threads: " + renameThreads));
+
+    // Validate thread executions
+    for (int i = 0; i < renameThreads; i++) {
+      assertTrue(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i));
+    }
+  }
+
+  /*
+   * Test case for rename operation with threads disabled and flat listing enabled.
+   */
+  @Test
+  public void testRenameLargeFolderDisableThreads() throws Exception {
+    Configuration conf = fs.getConf();
+
+    // Number of threads set to 0 or 1 disables threads.
+    conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, 0);
+    URI uri = fs.getUri();
+    fs.initialize(uri, conf);
+
+    // Populate source folder with large number of files and directories.
+    this.iterations = 10;
+    validateRenameFolder(fs, "root", "rootnew");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Disabling threads for Rename operation as thread count 0"));
+
+    // Validate no thread executions
+    for (int i = 0; i < renameThreads; i++) {
+      assertFalse(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i));
+    }
+  }
+
+  /*
+   * Test case for rename operation with threads and flat listing disabled.
+   */
+  @Test
+  public void testRenameSmallFolderDisableThreadsDisableFlatListing() throws Exception {
+    Configuration conf = fs.getConf();
+    conf = fs.getConf();
+
+    // Number of threads set to 0 or 1 disables threads.
+    conf.setInt(NativeAzureFileSystem.AZURE_RENAME_THREADS, 1);
+    conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, false);
+    URI uri = fs.getUri();
+    fs.initialize(uri, conf);
+
+    validateRenameFolder(fs, "root", "rootnew");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Disabling threads for Rename operation as thread count 1"));
+
+    // Validate no thread executions
+    for (int i = 0; i < renameThreads; i++) {
+      assertFalse(content.contains("AzureBlobRenameThread-" + Thread.currentThread().getName() + "-" + i));
+    }
+  }
+
+  /*
+   * Helper method to do delete operation and validate all files in source folder
+   * doesn't exists after delete operation.
+   */
+  private void validateDeleteFolder(FileSystem fs, String source)  throws Exception {
+    // Create folder with files.
+    createFolder(fs, "root");
+    Path sourceFolder = new Path(source);
+
+    // Delete operation
+    assertTrue(fs.delete(sourceFolder, true));
+    assertFalse(fs.exists(sourceFolder));
+
+    for (int i = 0; i < this.iterations; i++) {
+      // check that source folder and files doesn't exists
+      assertFalse(fs.exists(new Path(source + "/" + i)));
+      assertFalse(fs.exists(new Path(source + "/" + i + "/fileToRename")));
+      assertFalse(fs.exists(new Path(source + "/" + i + "/file/to/rename")));
+      assertFalse(fs.exists(new Path(source + "/" + i + "/file+to%rename")));
+      assertFalse(fs.exists(new Path(source + "/fileToRename" + i)));
+    }
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteSmallFolderWithThreads() throws Exception {
+
+    validateDeleteFolder(fs, "root");
+
+    // With single iteration, we would have created 7 blobs.
+    int expectedThreadsCreated = Math.min(7, deleteThreads);
+
+    // Validate from logs that threads are enabled.
+    String content = logs.getOutput();
+    assertTrue(content.contains("ms with threads: " + expectedThreadsCreated));
+
+    // Validate thread executions
+    for (int i = 0; i < expectedThreadsCreated; i++) {
+      assertTrue(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i));
+    }
+
+    // Also ensure that we haven't spawned extra threads.
+    if (expectedThreadsCreated < deleteThreads) {
+      for (int i = expectedThreadsCreated; i < deleteThreads; i++) {
+        assertFalse(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i));
+      }
+    }
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteLargeFolderWithThreads() throws Exception {
+    // Populate source folder with large number of files and directories.
+    this.iterations = 10;
+    validateDeleteFolder(fs, "root");
+
+    // Validate from logs that threads are enabled.
+    String content = logs.getOutput();
+    assertTrue(content.contains("ms with threads: " + deleteThreads));
+
+    // Validate thread executions
+    for (int i = 0; i < deleteThreads; i++) {
+      assertTrue(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i));
+    }
+  }
+
+  /*
+   * Test case for delete operation with threads disabled and flat listing enabled.
+   */
+  @Test
+  public void testDeleteLargeFolderDisableThreads() throws Exception {
+    Configuration conf = fs.getConf();
+    conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, 0);
+    URI uri = fs.getUri();
+    fs.initialize(uri, conf);
+
+    // Populate source folder with large number of files and directories.
+    this.iterations = 10;
+    validateDeleteFolder(fs, "root");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Disabling threads for Delete operation as thread count 0"));
+
+    // Validate no thread executions
+    for (int i = 0; i < deleteThreads; i++) {
+      assertFalse(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i));
+    }
+  }
+
+  /*
+   * Test case for rename operation with threads and flat listing disabled.
+   */
+  @Test
+  public void testDeleteSmallFolderDisableThreadsDisableFlatListing() throws Exception {
+    Configuration conf = fs.getConf();
+
+    // Number of threads set to 0 or 1 disables threads.
+    conf.setInt(NativeAzureFileSystem.AZURE_DELETE_THREADS, 1);
+    conf.setBoolean(AzureNativeFileSystemStore.KEY_ENABLE_FLAT_LISTING, false);
+    URI uri = fs.getUri();
+    fs.initialize(uri, conf);
+
+    validateDeleteFolder(fs, "root");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Disabling threads for Delete operation as thread count 1"));
+
+    // Validate no thread executions
+    for (int i = 0; i < deleteThreads; i++) {
+      assertFalse(content.contains("AzureBlobDeleteThread-" + Thread.currentThread().getName() + "-" + i));
+    }
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteThreadPoolExceptionFailure() throws Exception {
+
+    // Spy azure file system object and raise exception for new thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+            path, NativeAzureFileSystem.AZURE_DELETE_THREADS));
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenThrow(new Exception());
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+        path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+    validateDeleteFolder(mockFs, "root");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Failed to create thread pool with threads"));
+    assertTrue(content.contains("Serializing the Delete operation"));
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteThreadPoolExecuteFailure() throws Exception {
+
+    // Mock thread pool executor to throw exception for all requests.
+    ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
+    Mockito.doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+
+    // Spy azure file system object and return mocked thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+            path, NativeAzureFileSystem.AZURE_DELETE_THREADS));
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+        path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+    validateDeleteFolder(mockFs, "root");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Rejected execution of thread for Delete operation on blob"));
+    assertTrue(content.contains("Serializing the Delete operation"));
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteThreadPoolExecuteSingleThreadFailure() throws Exception {
+
+    // Spy azure file system object and return mocked thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    // Spy a thread pool executor and link it to azure file system object.
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+            path, NativeAzureFileSystem.AZURE_DELETE_THREADS));
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+        path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+    // Create a thread executor and link it to mocked thread pool executor object.
+    ThreadPoolExecutor mockThreadExecutor = Mockito.spy(mockThreadPoolExecutor.getThreadPool(7));
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+    // Mock thread executor to throw exception for all requests.
+    Mockito.doCallRealMethod().doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+
+    validateDeleteFolder(mockFs, "root");
+
+    // Validate from logs that threads are enabled and unused threads.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Using thread pool for Delete operation with threads 7"));
+    assertTrue(content.contains("6 threads not used for Delete operation on blob"));
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteThreadPoolTerminationFailure() throws Exception {
+
+    // Spy azure file system object and return mocked thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    // Spy a thread pool executor and link it to azure file system object.
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        ((NativeAzureFileSystem) fs).getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+            path, NativeAzureFileSystem.AZURE_DELETE_THREADS));
+
+    // Create a thread executor and link it to mocked thread pool executor object.
+    // Mock thread executor to throw exception for terminating threads.
+    ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
+    Mockito.doNothing().when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+    Mockito.when(mockThreadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)).thenThrow(new InterruptedException());
+
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.when(mockFs.getThreadPoolExecutor(deleteThreads, "AzureBlobDeleteThread", "Delete",
+        path, NativeAzureFileSystem.AZURE_DELETE_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+    createFolder(mockFs, "root");
+    Path sourceFolder = new Path("root");
+    boolean exception = false;
+    try {
+      mockFs.delete(sourceFolder, true);
+    } catch (IOException e){
+      exception = true;
+    }
+
+    assertTrue(exception);
+    assertTrue(mockFs.exists(sourceFolder));
+
+    // Validate from logs that threads are enabled and delete operation is failed.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Using thread pool for Delete operation with threads"));
+    assertTrue(content.contains("Threads got interrupted Delete blob operation"));
+    assertTrue(content.contains("Delete failed as operation on subfolders and files failed."));
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteSingleDeleteFailure() throws Exception {
+
+    // Spy azure file system object and return false for deleting one file
+    LOG.info("testDeleteSingleDeleteFailure");
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root/0")));
+    Mockito.when(mockFs.deleteFile(path, true)).thenReturn(false);
+
+    createFolder(mockFs, "root");
+    Path sourceFolder = new Path("root");
+    assertFalse(mockFs.delete(sourceFolder, true));
+    assertTrue(mockFs.exists(sourceFolder));
+
+    // Validate from logs that threads are enabled and delete operation failed.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Using thread pool for Delete operation with threads"));
+    assertTrue(content.contains("Delete operation failed for file " + path));
+    assertTrue(content.contains("Terminating execution of Delete operation now as some other thread already got exception or operation failed"));
+    assertTrue(content.contains("Failed to delete files / subfolders in blob"));
+  }
+
+  /*
+   * Test case for delete operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testDeleteSingleDeleteException() throws Exception {
+
+    // Spy azure file system object and raise exception for deleting one file
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root/0")));
+    Mockito.doThrow(new IOException()).when(mockFs).deleteFile(path, true);
+
+    createFolder(mockFs, "root");
+    Path sourceFolder = new Path("root");
+
+    boolean exception = false;
+    try {
+      mockFs.delete(sourceFolder, true);
+    } catch (IOException e){
+      exception = true;
+    }
+
+    assertTrue(exception);
+    assertTrue(mockFs.exists(sourceFolder));
+
+    // Validate from logs that threads are enabled and delete operation failed.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Using thread pool for Delete operation with threads"));
+    assertTrue(content.contains("Encountered Exception for Delete operation for file " + path));
+    assertTrue(content.contains("Terminating execution of Delete operation now as some other thread already got exception or operation failed"));
+  }
+
+  /*
+   * Test case for rename operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testRenameThreadPoolExceptionFailure() throws Exception {
+
+    // Spy azure file system object and raise exception for new thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        ((NativeAzureFileSystem) fs).getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+            path, NativeAzureFileSystem.AZURE_RENAME_THREADS));
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenThrow(new Exception());
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.doReturn(mockThreadPoolExecutor).when(mockFs).getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+        path, NativeAzureFileSystem.AZURE_RENAME_THREADS);
+
+    validateRenameFolder(mockFs, "root", "rootnew");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Failed to create thread pool with threads"));
+    assertTrue(content.contains("Serializing the Rename operation"));
+  }
+
+  /*
+   * Test case for rename operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testRenameThreadPoolExecuteFailure() throws Exception {
+
+    // Mock thread pool executor to throw exception for all requests.
+    ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
+    Mockito.doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+
+    // Spy azure file system object and return mocked thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+            path, NativeAzureFileSystem.AZURE_RENAME_THREADS));
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+        path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+    validateRenameFolder(mockFs, "root", "rootnew");
+
+    // Validate from logs that threads are disabled.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Rejected execution of thread for Rename operation on blob"));
+    assertTrue(content.contains("Serializing the Rename operation"));
+  }
+
+  /*
+   * Test case for rename operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testRenameThreadPoolExecuteSingleThreadFailure() throws Exception {
+
+    // Spy azure file system object and return mocked thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    // Spy a thread pool executor and link it to azure file system object.
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+            path, NativeAzureFileSystem.AZURE_RENAME_THREADS));
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+        path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+    // Create a thread executor and link it to mocked thread pool executor object.
+    ThreadPoolExecutor mockThreadExecutor = Mockito.spy(mockThreadPoolExecutor.getThreadPool(7));
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+    // Mock thread executor to throw exception for all requests.
+    Mockito.doCallRealMethod().doThrow(new RejectedExecutionException()).when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+
+    validateRenameFolder(mockFs, "root", "rootnew");
+
+    // Validate from logs that threads are enabled and unused threads exists.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Using thread pool for Rename operation with threads 7"));
+    assertTrue(content.contains("6 threads not used for Rename operation on blob"));
+  }
+
+  /*
+   * Test case for rename operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testRenameThreadPoolTerminationFailure() throws Exception {
+
+    // Spy azure file system object and return mocked thread pool
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    // Spy a thread pool executor and link it to azure file system object.
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root")));
+    AzureFileSystemThreadPoolExecutor mockThreadPoolExecutor = Mockito.spy(
+        mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+            path, NativeAzureFileSystem.AZURE_RENAME_THREADS));
+
+    // With single iteration, we would have created 7 blobs resulting 7 threads.
+    Mockito.when(mockFs.getThreadPoolExecutor(renameThreads, "AzureBlobRenameThread", "Rename",
+        path, NativeAzureFileSystem.AZURE_RENAME_THREADS)).thenReturn(mockThreadPoolExecutor);
+
+    // Mock thread executor to throw exception for all requests.
+    ThreadPoolExecutor mockThreadExecutor = Mockito.mock(ThreadPoolExecutor.class);
+    Mockito.doNothing().when(mockThreadExecutor).execute(Mockito.any(Runnable.class));
+    Mockito.when(mockThreadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)).thenThrow(new InterruptedException());
+    Mockito.when(mockThreadPoolExecutor.getThreadPool(7)).thenReturn(mockThreadExecutor);
+
+
+    createFolder(mockFs, "root");
+    Path sourceFolder = new Path("root");
+    Path destFolder = new Path("rootnew");
+    boolean exception = false;
+    try {
+      mockFs.rename(sourceFolder, destFolder);
+    } catch (IOException e){
+      exception = true;
+    }
+
+    assertTrue(exception);
+    assertTrue(mockFs.exists(sourceFolder));
+
+    // Validate from logs that threads are enabled and rename operation is failed.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Using thread pool for Rename operation with threads"));
+    assertTrue(content.contains("Threads got interrupted Rename blob operation"));
+    assertTrue(content.contains("Rename failed as operation on subfolders and files failed."));
+  }
+
+  /*
+   * Test case for rename operation with multiple threads and flat listing enabled.
+   */
+  @Test
+  public void testRenameSingleRenameException() throws Exception {
+
+    // Spy azure file system object and raise exception for deleting one file
+    Path sourceFolder = new Path("root");
+    Path destFolder = new Path("rootnew");
+
+    // Spy azure file system object and populate rename pending spy object.
+    NativeAzureFileSystem mockFs = Mockito.spy((NativeAzureFileSystem) fs);
+
+    // Populate data now only such that rename pending spy object would see this data.
+    createFolder(mockFs, "root");
+
+    String srcKey = mockFs.pathToKey(mockFs.makeAbsolute(sourceFolder));
+    String dstKey = mockFs.pathToKey(mockFs.makeAbsolute(destFolder));
+
+    FolderRenamePending mockRenameFs = Mockito.spy(mockFs.prepareAtomicFolderRename(srcKey, dstKey));
+    Mockito.when(mockFs.prepareAtomicFolderRename(srcKey, dstKey)).thenReturn(mockRenameFs);
+    String path = mockFs.pathToKey(mockFs.makeAbsolute(new Path("root/0")));
+    Mockito.doThrow(new IOException()).when(mockRenameFs).renameFile(Mockito.any(FileMetadata.class));
+
+    boolean exception = false;
+    try {
+      mockFs.rename(sourceFolder, destFolder);
+    } catch (IOException e){
+      exception = true;
+    }
+
+    assertTrue(exception);
+    assertTrue(mockFs.exists(sourceFolder));
+
+    // Validate from logs that threads are enabled and delete operation failed.
+    String content = logs.getOutput();
+    assertTrue(content.contains("Using thread pool for Rename operation with threads"));
+    assertTrue(content.contains("Encountered Exception for Rename operation for file " + path));
+    assertTrue(content.contains("Terminating execution of Rename operation now as some other thread already got exception or operation failed"));
+  }
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create();
+  }
+
+}

+ 3 - 1
hadoop-tools/hadoop-azure/src/test/resources/log4j.properties

@@ -20,4 +20,6 @@ log4j.rootLogger=INFO,stdout
 log4j.threshold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t]: %c{2} (%F:%M(%L)) - %m%n
+
+log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG