Преглед изворни кода

HADOOP-12508. delete fails with exception when lease is held on blob. Contributed by Gaurav Kanade.

(cherry picked from commit 9e7dcab185abf2fdabb28f2799b9952b5664a4b0)
cnauroth пре 9 година
родитељ
комит
c1d7b26e94

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -702,6 +702,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12533. Introduce FileNotFoundException in WASB for read and seek API.
     (Dushyanth via cnauroth)
 
+    HADOOP-12508. delete fails with exception when lease is held on blob.
+    (Gaurav Kanade via cnauroth)
+
   OPTIMIZATIONS
 
     HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString()

+ 31 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java

@@ -2370,7 +2370,37 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
   @Override
   public void delete(String key) throws IOException {
-    delete(key, null);
+    try {
+      delete(key, null);
+    } catch (IOException e) {
+      Throwable t = e.getCause();
+      if(t != null && t instanceof StorageException) {
+        StorageException se = (StorageException) t;
+        if(se.getErrorCode().equals(("LeaseIdMissing"))){
+          SelfRenewingLease lease = null;
+          try {
+            lease = acquireLease(key);
+            delete(key, lease);
+          } catch (AzureException e3) {
+            LOG.warn("Got unexpected exception trying to acquire lease on "
+                + key + "." + e3.getMessage());
+            throw e3;
+          } finally {
+            try {
+              if(lease != null){
+                lease.free();
+              }
+            } catch (Exception e4){
+              LOG.error("Unable to free lease on " + key, e4);
+            }
+          }
+        } else {
+          throw e;
+        }
+      } else {
+        throw e;
+      }
+    }
   }
 
   @Override

+ 4 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SelfRenewingLease.java

@@ -22,6 +22,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import com.microsoft.azure.storage.AccessCondition;
 import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.blob.CloudBlob;
@@ -61,7 +63,8 @@ public class SelfRenewingLease {
 
 
   // Time to wait to retry getting the lease in milliseconds
-  private static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000;
+  @VisibleForTesting
+  static final int LEASE_ACQUIRE_RETRY_INTERVAL = 2000;
 
   public SelfRenewingLease(CloudBlobWrapper blobWrapper)
       throws StorageException {

+ 86 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFileSystemLive.java

@@ -21,10 +21,16 @@ package org.apache.hadoop.fs.azure;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+
 import org.junit.Test;
 
+import com.microsoft.azure.storage.StorageException;
+
 /*
  * Tests the Native Azure file system (WASB) against an actual blob store if
  * provided in the environment.
@@ -37,6 +43,86 @@ public class TestNativeAzureFileSystemLive extends
     return AzureBlobStorageTestAccount.create();
   }
 
+  /**
+   * Tests fs.delete() function to delete a blob when another blob is holding a
+   * lease on it. Delete if called without a lease should fail if another process
+   * is holding a lease and throw appropriate exception
+   * This is a scenario that would happen in HMaster startup when it tries to
+   * clean up the temp dirs while the HMaster process which was killed earlier
+   * held lease on the blob when doing some DDL operation
+   */
+  @Test
+  public void testDeleteThrowsExceptionWithLeaseExistsErrorMessage()
+      throws Exception {
+    LOG.info("Starting test");
+    final String FILE_KEY = "fileWithLease";
+    // Create the file
+    Path path = new Path(FILE_KEY);
+    fs.create(path);
+    assertTrue(fs.exists(path));
+    NativeAzureFileSystem nfs = (NativeAzureFileSystem)fs;
+    final String fullKey = nfs.pathToKey(nfs.makeAbsolute(path));
+    final AzureNativeFileSystemStore store = nfs.getStore();
+
+    // Acquire the lease on the file in a background thread
+    final CountDownLatch leaseAttemptComplete = new CountDownLatch(1);
+    final CountDownLatch beginningDeleteAttempt = new CountDownLatch(1);
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        // Acquire the lease and then signal the main test thread.
+        SelfRenewingLease lease = null;
+        try {
+          lease = store.acquireLease(fullKey);
+          LOG.info("Lease acquired: " + lease.getLeaseID());
+        } catch (AzureException e) {
+          LOG.warn("Lease acqusition thread unable to acquire lease", e);
+        } finally {
+          leaseAttemptComplete.countDown();
+        }
+
+        // Wait for the main test thread to signal it will attempt the delete.
+        try {
+          beginningDeleteAttempt.await();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+
+        // Keep holding the lease past the lease acquisition retry interval, so
+        // the test covers the case of delete retrying to acquire the lease.
+        try {
+          Thread.sleep(SelfRenewingLease.LEASE_ACQUIRE_RETRY_INTERVAL * 3);
+        } catch (InterruptedException ex) {
+          Thread.currentThread().interrupt();
+        }
+
+        try {
+          if (lease != null){
+            LOG.info("Freeing lease");
+            lease.free();
+          }
+        } catch (StorageException se) {
+          LOG.warn("Unable to free lease.", se);
+        }
+      }
+    };
+
+    // Start the background thread and wait for it to signal the lease is held.
+    t.start();
+    try {
+      leaseAttemptComplete.await();
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    // Try to delete the same file
+    beginningDeleteAttempt.countDown();
+    store.delete(fullKey);
+
+    // At this point file SHOULD BE DELETED
+    assertFalse(fs.exists(path));
+  }
+
   /**
    * Check that isPageBlobKey works as expected. This assumes that
    * in the test configuration, the list of supported page blob directories