Parcourir la source

HADOOP-16030. AliyunOSS: bring fixes back from HADOOP-15671. Contributed by wujinhu.

(cherry picked from commit f87b3b11c46704dcdb63089dd971e2a5ba1deaac)
Weiwei Yang il y a 6 ans
Parent
commit
cc6e7b8a3d

+ 3 - 2
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java

@@ -124,7 +124,8 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
         if (null == partETags) {
           throw new IOException("Failed to multipart upload to oss, abort it.");
         }
-        store.completeMultipartUpload(key, uploadId, partETags);
+        store.completeMultipartUpload(key, uploadId,
+            new ArrayList<>(partETags));
       }
     } finally {
       removePartFiles();
@@ -133,7 +134,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
   }
 
   @Override
-  public void write(int b) throws IOException {
+  public synchronized void write(int b) throws IOException {
     singleByte[0] = (byte)b;
     write(singleByte, 0, 1);
   }

+ 3 - 2
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java

@@ -150,7 +150,7 @@ public class AliyunOSSFileSystemStore {
           "null or empty. Please set proper endpoint with 'fs.oss.endpoint'.");
     }
     CredentialsProvider provider =
-        AliyunOSSUtils.getCredentialsProvider(conf);
+        AliyunOSSUtils.getCredentialsProvider(uri, conf);
     ossClient = new OSSClient(endPoint, provider, clientConf);
     uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
         MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
@@ -158,6 +158,8 @@ public class AliyunOSSFileSystemStore {
     serverSideEncryptionAlgorithm =
         conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
 
+    bucketName = uri.getHost();
+
     String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
     if (StringUtils.isNotEmpty(cannedACLName)) {
       CannedAccessControlList cannedACL =
@@ -167,7 +169,6 @@ public class AliyunOSSFileSystemStore {
     }
 
     maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
-    bucketName = uri.getHost();
   }
 
   /**

+ 5 - 3
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.aliyun.oss;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 
 import com.aliyun.oss.common.auth.CredentialsProvider;
 import com.google.common.base.Preconditions;
@@ -95,13 +96,14 @@ final public class AliyunOSSUtils {
    * Create credential provider specified by configuration, or create default
    * credential provider if not specified.
    *
+   * @param uri uri passed by caller
    * @param conf configuration
    * @return a credential provider
    * @throws IOException on any problem. Class construction issues may be
    * nested inside the IOE.
    */
-  public static CredentialsProvider getCredentialsProvider(Configuration conf)
-      throws IOException {
+  public static CredentialsProvider getCredentialsProvider(
+      URI uri, Configuration conf) throws IOException {
     CredentialsProvider credentials;
 
     String className = conf.getTrimmed(CREDENTIALS_PROVIDER_KEY);
@@ -117,7 +119,7 @@ final public class AliyunOSSUtils {
         try {
           credentials =
               (CredentialsProvider)credClass.getDeclaredConstructor(
-                  Configuration.class).newInstance(conf);
+                  URI.class, Configuration.class).newInstance(uri, conf);
         } catch (NoSuchMethodException | SecurityException e) {
           credentials =
               (CredentialsProvider)credClass.getDeclaredConstructor()

+ 21 - 4
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunCredentials.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.aliyun.oss;
 
 import com.aliyun.oss.common.auth.Credentials;
+import com.aliyun.oss.common.auth.CredentialsProvider;
 import com.aliyun.oss.common.auth.InvalidCredentialsException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.aliyun.oss.contract.AliyunOSSContract;
@@ -27,6 +28,8 @@ import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
 
 import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID;
 import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET;
@@ -63,16 +66,30 @@ public class TestAliyunCredentials extends AbstractFSContractTestBase {
     validateCredential(conf);
   }
 
-  private void validateCredential(Configuration conf) {
+  private void validateCredential(URI uri, Configuration conf) {
     try {
-      AliyunCredentialsProvider provider
-          = new AliyunCredentialsProvider(conf);
+      CredentialsProvider provider =
+          AliyunOSSUtils.getCredentialsProvider(uri, conf);
       Credentials credentials = provider.getCredentials();
       fail("Expected a CredentialInitializationException, got " + credentials);
     } catch (InvalidCredentialsException expected) {
       // expected
     } catch (IOException e) {
-      fail("Unexpected exception.");
+      Throwable cause = e.getCause();
+      if (cause instanceof InvocationTargetException) {
+        boolean isInstance =
+            ((InvocationTargetException)cause).getTargetException()
+                instanceof InvalidCredentialsException;
+        if (!isInstance) {
+          fail("Unexpected exception.");
+        }
+      } else {
+        fail("Unexpected exception.");
+      }
     }
   }
+
+  private void validateCredential(Configuration conf) {
+    validateCredential(null, conf);
+  }
 }