Explorar o código

HADOOP-18339. S3A storage class option only picked up when buffering writes to disk. (#4669)

Follow-up to HADOOP-12020 Support configuration of different S3 storage classes; 
S3 storage class is now set when buffering to heap/bytebuffers, and when
creating directory markers

Contributed by Monthon Klongklaew
monthonk %!s(int64=2) %!d(string=hai) anos
pai
achega
20560401ec

+ 13 - 7
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java

@@ -408,6 +408,9 @@ public class RequestFactoryImpl implements RequestFactory {
         inputStream, metadata);
     setOptionalPutRequestParameters(putObjectRequest);
     putObjectRequest.setCannedAcl(cannedACL);
+    if (storageClass != null) {
+      putObjectRequest.setStorageClass(storageClass);
+    }
     return prepareRequest(putObjectRequest);
   }
 
@@ -416,19 +419,22 @@ public class RequestFactoryImpl implements RequestFactory {
     String key = directory.endsWith("/")
         ? directory
         : (directory + "/");
-    // an input stream which is laways empty
-    final InputStream im = new InputStream() {
+    // an input stream which is always empty
+    final InputStream inputStream = new InputStream() {
       @Override
       public int read() throws IOException {
         return -1;
       }
     };
     // preparation happens in here
-    final ObjectMetadata md = createObjectMetadata(0L, true);
-    md.setContentType(HeaderProcessing.CONTENT_TYPE_X_DIRECTORY);
-    PutObjectRequest putObjectRequest =
-        newPutObjectRequest(key, md, null, im);
-    return putObjectRequest;
+    final ObjectMetadata metadata = createObjectMetadata(0L, true);
+    metadata.setContentType(HeaderProcessing.CONTENT_TYPE_X_DIRECTORY);
+
+    PutObjectRequest putObjectRequest = new PutObjectRequest(getBucket(), key,
+        inputStream, metadata);
+    setOptionalPutRequestParameters(putObjectRequest);
+    putObjectRequest.setCannedAcl(cannedACL);
+    return prepareRequest(putObjectRequest);
   }
 
   @Override

+ 29 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AStorageClass.java

@@ -19,10 +19,14 @@
 package org.apache.hadoop.fs.s3a;
 
 import java.nio.file.AccessDeniedException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Map;
 
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -30,6 +34,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
 
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_DISK;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BYTEBUFFER;
 import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS;
 import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_GLACIER;
 import static org.apache.hadoop.fs.s3a.Constants.STORAGE_CLASS_REDUCED_REDUNDANCY;
@@ -43,13 +51,33 @@ import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 /**
  * Tests of storage class.
  */
+@RunWith(Parameterized.class)
 public class ITestS3AStorageClass extends AbstractS3ATestBase {
 
+  /**
+   * HADOOP-18339. Parameterized the test for different fast upload buffer types
+   * to ensure the storage class configuration works with all of them.
+   */
+  @Parameterized.Parameters(name = "fast-upload-buffer-{0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {FAST_UPLOAD_BUFFER_DISK},
+        {FAST_UPLOAD_BUFFER_ARRAY}
+    });
+  }
+
+  private final String fastUploadBufferType;
+
+  public ITestS3AStorageClass(String fastUploadBufferType) {
+    this.fastUploadBufferType = fastUploadBufferType;
+  }
+
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
     disableFilesystemCaching(conf);
-    removeBaseAndBucketOverrides(conf, STORAGE_CLASS);
+    removeBaseAndBucketOverrides(conf, STORAGE_CLASS, FAST_UPLOAD_BUFFER);
+    conf.set(FAST_UPLOAD_BUFFER, fastUploadBufferType);
 
     return conf;
   }