ソースを参照

HADOOP-18925. S3A: option to enable/disable CopyFromLocalOperation (#6163) (#6259)

Add a new option fs.s3a.optimized.copy.from.local.enabled

This will enable (default) or disable the
optimized CopyFromLocalOperation upload operation
when copyFromLocalFile() is invoked.

When false the superclass implementation is used; duration
statistics are still collected, though audit span entries
in logs will be for the individual fs operations, not the
overall operation.

Contributed by Steve Loughran
Steve Loughran 1 年間 前
コミット
3b7a7f447f

+ 12 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -1299,4 +1299,16 @@ public final class Constants {
    */
   public static final int DEFAULT_PREFETCH_MAX_BLOCKS_COUNT = 4;
 
+  /**
+   * Is the higher performance copy from local file to S3 enabled?
+   * This switch allows for it to be disabled if there are problems.
+   * Value: {@value}.
+   */
+  public static final String OPTIMIZED_COPY_FROM_LOCAL = "fs.s3a.optimized.copy.from.local.enabled";
+
+  /**
+   * Default value for {@link #OPTIMIZED_COPY_FROM_LOCAL}.
+   * Value: {@value}.
+   */
+  public static final boolean OPTIMIZED_COPY_FROM_LOCAL_DEFAULT = true;
 }

+ 67 - 29
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -429,6 +429,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    */
   private String scheme = FS_S3A;
 
+  /**
+   * Flag to indicate that the higher performance copyFromLocalFile implementation
+   * should be used.
+   */
+  private boolean optimizedCopyFromLocal;
+
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
@@ -654,6 +660,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
               AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
       vectoredIOContext = populateVectoredIOContext(conf);
       scheme = (this.uri != null && this.uri.getScheme() != null) ? this.uri.getScheme() : FS_S3A;
+      optimizedCopyFromLocal = conf.getBoolean(OPTIMIZED_COPY_FROM_LOCAL,
+          OPTIMIZED_COPY_FROM_LOCAL_DEFAULT);
+      LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal);
     } catch (AmazonClientException e) {
       // amazon client exception: stop all services then throw the translation
       cleanupWithLogger(LOG, span);
@@ -3825,9 +3834,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * the given dst name.
    *
    * This version doesn't need to create a temporary file to calculate the md5.
-   * Sadly this doesn't seem to be used by the shell cp :(
+   * If {@link Constants#OPTIMIZED_COPY_FROM_LOCAL} is set to false,
+   * the superclass implementation is used.
    *
-   * delSrc indicates if the source should be removed
    * @param delSrc whether to delete the src
    * @param overwrite whether to overwrite an existing file
    * @param src path
@@ -3842,28 +3851,53 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
                                 Path dst) throws IOException {
     checkNotClosed();
-    LOG.debug("Copying local file from {} to {}", src, dst);
-    trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst,
-        () -> new CopyFromLocalOperation(
-            createStoreContext(),
-            src,
-            dst,
-            delSrc,
-            overwrite,
-            createCopyFromLocalCallbacks()).execute());
+    LOG.debug("Copying local file from {} to {} (delSrc={} overwrite={}",
+        src, dst, delSrc, overwrite);
+    if (optimizedCopyFromLocal) {
+      trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () ->
+          new CopyFromLocalOperation(
+              createStoreContext(),
+              src,
+              dst,
+              delSrc,
+              overwrite,
+              createCopyFromLocalCallbacks(getActiveAuditSpan()))
+              .execute());
+    } else {
+      // call the superclass, but still count statistics.
+      // there is no overall span here, as each FS API call will
+      // be in its own span.
+      LOG.debug("Using base copyFromLocalFile implementation");
+      trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> {
+        super.copyFromLocalFile(delSrc, overwrite, src, dst);
+        return null;
+      });
+    }
   }
 
+  /**
+   * Create the CopyFromLocalCallbacks;
+   * protected to assist in mocking.
+   * @param span audit span.
+   * @return the callbacks
+   * @throws IOException failure to get the local fs.
+   */
   protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
-      createCopyFromLocalCallbacks() throws IOException {
+      createCopyFromLocalCallbacks(final AuditSpanS3A span) throws IOException {
     LocalFileSystem local = getLocal(getConf());
-    return new CopyFromLocalCallbacksImpl(local);
+    return new CopyFromLocalCallbacksImpl(span, local);
   }
 
   protected final class CopyFromLocalCallbacksImpl implements
       CopyFromLocalOperation.CopyFromLocalOperationCallbacks {
+
+    /** Span to use for all operations. */
+    private final AuditSpanS3A span;
     private final LocalFileSystem local;
 
-    private CopyFromLocalCallbacksImpl(LocalFileSystem local) {
+    private CopyFromLocalCallbacksImpl(final AuditSpanS3A span,
+        LocalFileSystem local) {
+      this.span = span;
       this.local = local;
     }
 
@@ -3885,21 +3919,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
     @Override
     public void copyLocalFileFromTo(File file, Path from, Path to) throws IOException {
-      trackDurationAndSpan(
-          OBJECT_PUT_REQUESTS,
-          to,
-          () -> {
-            final String key = pathToKey(to);
-            final ObjectMetadata om = newObjectMetadata(file.length());
-            Progressable progress = null;
-            PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, file);
-            S3AFileSystem.this.invoker.retry(
-                "putObject(" + "" + ")", to.toString(),
-                true,
-                () -> executePut(putObjectRequest, progress, putOptionsForPath(to)));
-
-            return null;
-          });
+      // the duration of the put is measured, but the active span is the
+      // constructor-supplied one -this ensures all audit log events are grouped correctly
+      span.activate();
+      trackDuration(getDurationTrackerFactory(), OBJECT_PUT_REQUESTS.getSymbol(), () -> {
+        final String key = pathToKey(to);
+        final ObjectMetadata om = newObjectMetadata(file.length());
+        Progressable progress = null;
+        PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, file);
+        S3AFileSystem.this.invoker.retry(
+            "putObject()", to.toString(),
+            true,
+            () -> executePut(putObjectRequest, progress, putOptionsForPath(to)));
+
+        return null;
+      });
     }
 
     @Override
@@ -5149,6 +5183,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     case FS_S3A_CREATE_HEADER:
       return true;
 
+      // is the optimized copy from local enabled.
+    case OPTIMIZED_COPY_FROM_LOCAL:
+      return optimizedCopyFromLocal;
+
     default:
       return super.hasPathCapability(p, cap);
     }

+ 10 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md

@@ -2058,3 +2058,13 @@ updated to implement `software.amazon.awssdk.core.signer.Signer`.
 This will be logged when `getObjectMetadata` is called. In SDK V2, this operation has changed to
 `headObject()` and will return a response of the type `HeadObjectResponse`.
 
+
+### <a name="debug-switches"></a> Debugging Switches
+
+There are some switches which can be set to enable/disable features and assist
+in isolating problems and at least make them "go away".
+
+
+| Key  | Default | Action   |
+|------|---------|----------|
+| `fs.s3a.optimized.copy.from.local.enabled` | `true`  | [HADOOP-18925](https://issues.apache.org/jira/browse/HADOOP-18925) enable/disable CopyFromLocalOperation. Also a path capability. |

+ 51 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.fs.s3a;
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractCopyFromLocalTest;
@@ -26,18 +28,67 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
 
 import org.apache.hadoop.fs.Path;
+
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import static org.apache.hadoop.fs.s3a.Constants.OPTIMIZED_COPY_FROM_LOCAL;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
+/**
+ * Test copying files from the local filesystem to S3A.
+ * Parameterized on whether or not the optimized
+ * copyFromLocalFile is enabled.
+ */
+@RunWith(Parameterized.class)
 public class ITestS3ACopyFromLocalFile extends
         AbstractContractCopyFromLocalTest {
+  /**
+   * Parameterization.
+   */
+  @Parameterized.Parameters(name = "enabled={0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {true},
+        {false},
+    });
+  }
+  private final boolean enabled;
+
+  public ITestS3ACopyFromLocalFile(final boolean enabled) {
+    this.enabled = enabled;
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+
+    removeBaseAndBucketOverrides(getTestBucketName(conf), conf,
+        OPTIMIZED_COPY_FROM_LOCAL);
+    conf.setBoolean(OPTIMIZED_COPY_FROM_LOCAL, enabled);
+    disableFilesystemCaching(conf);
+    return conf;
+  }
 
   @Override
   protected AbstractFSContract createContract(Configuration conf) {
     return new S3AContract(conf);
   }
 
+  @Test
+  public void testOptionPropagation() throws Throwable {
+    Assertions.assertThat(getFileSystem().hasPathCapability(new Path("/"),
+        OPTIMIZED_COPY_FROM_LOCAL))
+        .describedAs("path capability of %s", OPTIMIZED_COPY_FROM_LOCAL)
+        .isEqualTo(enabled);
+
+  }
+
   @Test
   public void testLocalFilesOnly() throws Throwable {
     describe("Copying into other file systems must fail");