Переглянути джерело

HADOOP-18925. S3A: option to enable/disable CopyFromLocalOperation (#6163)

Add a new option:
fs.s3a.optimized.copy.from.local.enabled

This will enable (default) or disable the
optimized CopyFromLocalOperation upload operation
when copyFromLocalFile() is invoked.

When false the superclass implementation is used; duration
statistics are still collected, though audit span entries
in logs will be for the individual fs operations, not the
overall operation.

Contributed by Steve Loughran
Steve Loughran 1 рік тому
батько
коміт
141b0d3508

+ 13 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -1496,4 +1496,17 @@ public final class Constants {
    */
   public static final boolean DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT = false;
 
+
+  /**
+   * Is the higher performance copy from local file to S3 enabled?
+   * This switch allows for it to be disabled if there are problems.
+   * Value: {@value}.
+   */
+  public static final String OPTIMIZED_COPY_FROM_LOCAL = "fs.s3a.optimized.copy.from.local.enabled";
+
+  /**
+   * Default value for {@link #OPTIMIZED_COPY_FROM_LOCAL}.
+   * Value: {@value}.
+   */
+  public static final boolean OPTIMIZED_COPY_FROM_LOCAL_DEFAULT = true;
 }

+ 64 - 29
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -464,6 +464,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    */
   private String scheme = FS_S3A;
 
+  /**
+   * Flag to indicate that the higher performance copyFromLocalFile implementation
+   * should be used.
+   */
+  private boolean optimizedCopyFromLocal;
+
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
@@ -700,6 +706,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
               AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
       vectoredIOContext = populateVectoredIOContext(conf);
       scheme = (this.uri != null && this.uri.getScheme() != null) ? this.uri.getScheme() : FS_S3A;
+      optimizedCopyFromLocal = conf.getBoolean(OPTIMIZED_COPY_FROM_LOCAL,
+          OPTIMIZED_COPY_FROM_LOCAL_DEFAULT);
+      LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal);
     } catch (SdkException e) {
       // amazon client exception: stop all services then throw the translation
       cleanupWithLogger(LOG, span);
@@ -4028,9 +4037,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * the given dst name.
    *
    * This version doesn't need to create a temporary file to calculate the md5.
-   * Sadly this doesn't seem to be used by the shell cp :(
+   * If {@link Constants#OPTIMIZED_COPY_FROM_LOCAL} is set to false,
+   * the superclass implementation is used.
    *
-   * delSrc indicates if the source should be removed
    * @param delSrc whether to delete the src
    * @param overwrite whether to overwrite an existing file
    * @param src path
@@ -4038,35 +4047,59 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * @throws IOException IO problem
    * @throws FileAlreadyExistsException the destination file exists and
    * overwrite==false
-   * @throws SdkException failure in the AWS SDK
    */
   @Override
   @AuditEntryPoint
   public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
                                 Path dst) throws IOException {
     checkNotClosed();
-    LOG.debug("Copying local file from {} to {}", src, dst);
-    trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst,
-        () -> new CopyFromLocalOperation(
-            createStoreContext(),
-            src,
-            dst,
-            delSrc,
-            overwrite,
-            createCopyFromLocalCallbacks()).execute());
+    LOG.debug("Copying local file from {} to {} (delSrc={} overwrite={}",
+        src, dst, delSrc, overwrite);
+    if (optimizedCopyFromLocal) {
+      trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () ->
+          new CopyFromLocalOperation(
+              createStoreContext(),
+              src,
+              dst,
+              delSrc,
+              overwrite,
+              createCopyFromLocalCallbacks(getActiveAuditSpan()))
+              .execute());
+    } else {
+      // call the superclass, but still count statistics.
+      // there is no overall span here, as each FS API call will
+      // be in its own span.
+      LOG.debug("Using base copyFromLocalFile implementation");
+      trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> {
+        super.copyFromLocalFile(delSrc, overwrite, src, dst);
+        return null;
+      });
+    }
   }
 
+  /**
+   * Create the CopyFromLocalCallbacks;
+   * protected to assist in mocking.
+   * @param span audit span.
+   * @return the callbacks
+   * @throws IOException failure to get the local fs.
+   */
   protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
-      createCopyFromLocalCallbacks() throws IOException {
+      createCopyFromLocalCallbacks(final AuditSpanS3A span) throws IOException {
     LocalFileSystem local = getLocal(getConf());
-    return new CopyFromLocalCallbacksImpl(local);
+    return new CopyFromLocalCallbacksImpl(span, local);
   }
 
   protected final class CopyFromLocalCallbacksImpl implements
       CopyFromLocalOperation.CopyFromLocalOperationCallbacks {
+
+    /** Span to use for all operations. */
+    private final AuditSpanS3A span;
     private final LocalFileSystem local;
 
-    private CopyFromLocalCallbacksImpl(LocalFileSystem local) {
+    private CopyFromLocalCallbacksImpl(final AuditSpanS3A span,
+        LocalFileSystem local) {
+      this.span = span;
       this.local = local;
     }
 
@@ -4088,20 +4121,18 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
     @Override
     public void copyLocalFileFromTo(File file, Path from, Path to) throws IOException {
-      trackDurationAndSpan(
-          OBJECT_PUT_REQUESTS,
-          to,
-          () -> {
-            final String key = pathToKey(to);
-            Progressable progress = null;
-            PutObjectRequest.Builder putObjectRequestBuilder =
-                newPutObjectRequestBuilder(key, file.length(), false);
-            S3AFileSystem.this.invoker.retry("putObject(" + "" + ")", to.toString(), true,
-                () -> executePut(putObjectRequestBuilder.build(), progress, putOptionsForPath(to),
-                    file));
-
-            return null;
-          });
+      // the duration of the put is measured, but the active span is the
+      // constructor-supplied one -this ensures all audit log events are grouped correctly
+      span.activate();
+      trackDuration(getDurationTrackerFactory(), OBJECT_PUT_REQUESTS.getSymbol(), () -> {
+        final String key = pathToKey(to);
+        PutObjectRequest.Builder putObjectRequestBuilder =
+            newPutObjectRequestBuilder(key, file.length(), false);
+        final String dest = to.toString();
+        S3AFileSystem.this.invoker.retry("putObject(" + dest + ")", dest, true, () ->
+            executePut(putObjectRequestBuilder.build(), null, putOptionsForPath(to), file));
+        return null;
+      });
     }
 
     @Override
@@ -5403,6 +5434,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     case FS_S3A_CREATE_PERFORMANCE_ENABLED:
       return performanceCreation;
 
+      // is the optimized copy from local enabled.
+    case OPTIMIZED_COPY_FROM_LOCAL:
+      return optimizedCopyFromLocal;
+
     default:
       return super.hasPathCapability(p, cap);
     }

+ 10 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md

@@ -1544,3 +1544,13 @@ software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP
 
 When this happens, try to set `fs.s3a.connection.request.timeout` to a larger value or disable it
 completely by setting it to `0`.
+
+### <a name="debug-switches"></a> Debugging Switches
+
+There are some switches which can be set to enable/disable features and assist
+in isolating problems and at least make them "go away".
+
+
+| Key  | Default | Action   |
+|------|---------|----------|
+| `fs.s3a.optimized.copy.from.local.enabled` | `true`  | [HADOOP-18925](https://issues.apache.org/jira/browse/HADOOP-18925) enable/disable CopyFromLocalOperation. Also a path capability. |

+ 51 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACopyFromLocalFile.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.fs.s3a;
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractContractCopyFromLocalTest;
@@ -26,18 +28,67 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
 
 import org.apache.hadoop.fs.Path;
+
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import static org.apache.hadoop.fs.s3a.Constants.OPTIMIZED_COPY_FROM_LOCAL;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
+/**
+ * Test copying files from the local filesystem to S3A.
+ * Parameterized on whether or not the optimized
+ * copyFromLocalFile is enabled.
+ */
+@RunWith(Parameterized.class)
 public class ITestS3ACopyFromLocalFile extends
         AbstractContractCopyFromLocalTest {
+  /**
+   * Parameterization.
+   */
+  @Parameterized.Parameters(name = "enabled={0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {true},
+        {false},
+    });
+  }
+  private final boolean enabled;
+
+  public ITestS3ACopyFromLocalFile(final boolean enabled) {
+    this.enabled = enabled;
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+
+    removeBaseAndBucketOverrides(getTestBucketName(conf), conf,
+        OPTIMIZED_COPY_FROM_LOCAL);
+    conf.setBoolean(OPTIMIZED_COPY_FROM_LOCAL, enabled);
+    disableFilesystemCaching(conf);
+    return conf;
+  }
 
   @Override
   protected AbstractFSContract createContract(Configuration conf) {
     return new S3AContract(conf);
   }
 
+  @Test
+  public void testOptionPropagation() throws Throwable {
+    Assertions.assertThat(getFileSystem().hasPathCapability(new Path("/"),
+        OPTIMIZED_COPY_FROM_LOCAL))
+        .describedAs("path capability of %s", OPTIMIZED_COPY_FROM_LOCAL)
+        .isEqualTo(enabled);
+
+  }
+
   @Test
   public void testLocalFilesOnly() throws Throwable {
     describe("Copying into other file systems must fail");