Переглянути джерело

HADOOP-15086. NativeAzureFileSystem file rename is not atomic.
Contributed by Thomas Marquardt

(cherry picked from commit 52babbb4a0e3c89f2025bf6e9a1b51a96e8f8fb0)

Steve Loughran 7 роки тому
батько
коміт
72a49503fe

+ 12 - 4
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java

@@ -2605,12 +2605,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
   @Override
   public void rename(String srcKey, String dstKey) throws IOException {
-    rename(srcKey, dstKey, false, null);
+    rename(srcKey, dstKey, false, null, true);
   }
 
   @Override
   public void rename(String srcKey, String dstKey, boolean acquireLease,
-      SelfRenewingLease existingLease) throws IOException {
+                     SelfRenewingLease existingLease) throws IOException {
+    rename(srcKey, dstKey, acquireLease, existingLease, true);
+  }
+
+    @Override
+  public void rename(String srcKey, String dstKey, boolean acquireLease,
+      SelfRenewingLease existingLease, boolean overwriteDestination) throws IOException {
 
     LOG.debug("Moving {} to {}", srcKey, dstKey);
 
@@ -2672,7 +2678,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       // a more intensive exponential retry policy when the cluster is getting
       // throttled.
       try {
-        dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext());
+        dstBlob.startCopyFromBlob(srcBlob, null,
+            getInstrumentedContext(), overwriteDestination);
       } catch (StorageException se) {
         if (se.getHttpStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
           int copyBlobMinBackoff = sessionConfiguration.getInt(
@@ -2695,7 +2702,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
           options.setRetryPolicyFactory(new RetryExponentialRetry(
             copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff,
             copyBlobMaxRetries));
-          dstBlob.startCopyFromBlob(srcBlob, options, getInstrumentedContext());
+          dstBlob.startCopyFromBlob(srcBlob, options,
+              getInstrumentedContext(), overwriteDestination);
         } else {
           throw se;
         }

+ 18 - 7
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java

@@ -2720,16 +2720,27 @@ public class NativeAzureFileSystem extends FileSystem {
     } else if (!srcMetadata.isDir()) {
       LOG.debug("Source {} found as a file, renaming.", src);
       try {
-        store.rename(srcKey, dstKey);
+        // HADOOP-15086 - file rename must ensure that the destination does
+        // not exist.  The fix is targeted to this call only to avoid
+        // regressions.  Other call sites are attempting to rename temporary
+        // files, redo a failed rename operation, or rename a directory
+        // recursively; for these cases the destination may exist.
+        store.rename(srcKey, dstKey, false, null,
+            false);
       } catch(IOException ex) {
-
         Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
-        if (innerException instanceof StorageException
-            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
-
-          LOG.debug("BlobNotFoundException encountered. Failing rename", src);
-          return false;
+        if (innerException instanceof StorageException) {
+          if (NativeAzureFileSystemHelper.isFileNotFoundException(
+              (StorageException) innerException)) {
+            LOG.debug("BlobNotFoundException encountered. Failing rename", src);
+            return false;
+          }
+          if (NativeAzureFileSystemHelper.isBlobAlreadyExistsConflict(
+              (StorageException) innerException)) {
+            LOG.debug("Destination BlobAlreadyExists. Failing rename", src);
+            return false;
+          }
         }
 
         throw ex;

+ 18 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azure;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.net.HttpURLConnection;
 import java.util.Map;
 
 import com.google.common.base.Preconditions;
@@ -95,6 +96,23 @@ final class NativeAzureFileSystemHelper {
     return false;
   }
 
+  /*
+   * Determines if a conditional request failed because the blob already
+   * exists.
+   *
+   * @param e - the storage exception thrown by the failed operation.
+   *
+   * @return true if a conditional request failed because the blob already
+   * exists; otherwise, returns false.
+   */
+  static boolean isBlobAlreadyExistsConflict(StorageException e) {
+    if (e.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT
+        && StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(e.getErrorCode())) {
+      return true;
+    }
+    return false;
+  }
+
   /*
    * Helper method that logs stack traces from all live threads.
    */

+ 4 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java

@@ -91,6 +91,10 @@ interface NativeFileSystemStore {
   void rename(String srcKey, String dstKey, boolean acquireLease, SelfRenewingLease existingLease)
       throws IOException;
 
+  void rename(String srcKey, String dstKey, boolean acquireLease,
+              SelfRenewingLease existingLease, boolean overwriteDestination)
+      throws IOException;
+
   /**
    * Delete all keys with the given prefix. Used for testing.
    *

+ 6 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java

@@ -503,10 +503,14 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
 
     @Override
     public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
-        OperationContext opContext)
+        OperationContext opContext, boolean overwriteDestination)
             throws StorageException, URISyntaxException {
+      AccessCondition dstAccessCondition =
+          overwriteDestination
+              ? null
+              : AccessCondition.generateIfNotExistsCondition();
       getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
-          null, null, options, opContext);
+          null, dstAccessCondition, options, opContext);
     }
 
     @Override

+ 1 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java

@@ -406,7 +406,7 @@ abstract class StorageInterface {
      *
      */
     public abstract void startCopyFromBlob(CloudBlobWrapper sourceBlob,
-        BlobRequestOptions options, OperationContext opContext)
+        BlobRequestOptions options, OperationContext opContext, boolean overwriteDestination)
         throws StorageException, URISyntaxException;
     
     /**

+ 6 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java

@@ -425,10 +425,14 @@ class StorageInterfaceImpl extends StorageInterface {
 
     @Override
     public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
-        OperationContext opContext)
+        OperationContext opContext, boolean overwriteDestination)
             throws StorageException, URISyntaxException {
+      AccessCondition dstAccessCondition =
+          overwriteDestination
+              ? null
+              : AccessCondition.generateIfNotExistsCondition();
       getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
-          null, null, options, opContext);
+          null, dstAccessCondition, options, opContext);
     }
 
     @Override

+ 72 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java

@@ -18,8 +18,16 @@
 
 package org.apache.hadoop.fs.azure;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
@@ -40,6 +48,70 @@ public class ITestNativeAzureFileSystemLive extends
     return AzureBlobStorageTestAccount.create();
   }
 
+  /**
+   * Tests the rename file operation to ensure that when there are multiple
+   * attempts to rename a file to the same destination, only one rename
+   * operation is successful (HADOOP-15086).
+   */
+  @Test
+  public void testMultipleRenameFileOperationsToSameDestination()
+      throws IOException, InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(1);
+    final AtomicInteger successfulRenameCount = new AtomicInteger(0);
+    final AtomicReference<IOException> unexpectedError = new AtomicReference<IOException>();
+    final Path dest = path("dest");
+
+    // Run 10 threads to rename multiple files to the same target path
+    List<Thread> threads = new ArrayList<>();
+
+    for (int i = 0; i < 10; i++) {
+      final int threadNumber = i;
+      Path src = path("test" + threadNumber);
+      threads.add(new Thread(() -> {
+        try {
+          latch.await(Long.MAX_VALUE, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+        }
+        try {
+          try (OutputStream output = fs.create(src)) {
+            output.write(("Source file number " + threadNumber).getBytes());
+          }
+
+          if (fs.rename(src, dest)) {
+            LOG.info("rename succeeded for thread " + threadNumber);
+            successfulRenameCount.incrementAndGet();
+          }
+        } catch (IOException e) {
+          unexpectedError.compareAndSet(null, e);
+          ContractTestUtils.fail("Exception unexpected", e);
+        }
+      }));
+    }
+
+    // Start each thread
+    threads.forEach(t -> t.start());
+
+    // Wait for threads to start and wait on latch
+    Thread.sleep(2000);
+
+    // Now start to rename
+    latch.countDown();
+
+    // Wait for all threads to complete
+    threads.forEach(t -> {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+      }
+    });
+
+    if (unexpectedError.get() != null) {
+      throw unexpectedError.get();
+    }
+    assertEquals(1, successfulRenameCount.get());
+    LOG.info("Success, only one rename operation succeeded!");
+  }
+
   @Test
   public void testLazyRenamePendingCanOverwriteExistingFile()
     throws Exception {

+ 8 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java

@@ -425,7 +425,14 @@ public class MockStorageInterface extends StorageInterface {
 
     @Override
     public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
-        OperationContext opContext) throws StorageException, URISyntaxException {
+        OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException {
+      if (!overwriteDestination && backingStore.exists(convertUriToDecodedString(uri))) {
+        throw new StorageException("BlobAlreadyExists",
+            "The blob already exists.",
+            HttpURLConnection.HTTP_CONFLICT,
+            null,
+            null);
+      }
       backingStore.copy(convertUriToDecodedString(sourceBlob.getUri()), convertUriToDecodedString(uri));
       //TODO: set the backingStore.properties.CopyState and
       //      update azureNativeFileSystemStore.waitForCopyToComplete