فهرست منبع

HADOOP-15323. AliyunOSS: Improve copy file performance for AliyunOSSFileSystemStore. Contributed wujinhu.

(cherry picked from commit 040a202b202a37f3b922cd321eb0a8ded457d88b)
Weiwei Yang 6 سال پیش
والد
کامیت
3bbc794834

+ 5 - 2
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java

@@ -32,13 +32,16 @@ public class AliyunOSSCopyFileTask implements Runnable {
 
   private AliyunOSSFileSystemStore store;
   private String srcKey;
+  private long srcLen;
   private String dstKey;
   private AliyunOSSCopyFileContext copyFileContext;
 
   public AliyunOSSCopyFileTask(AliyunOSSFileSystemStore store,
-      String srcKey, String dstKey, AliyunOSSCopyFileContext copyFileContext) {
+      String srcKey, long srcLen,
+      String dstKey, AliyunOSSCopyFileContext copyFileContext) {
     this.store = store;
     this.srcKey = srcKey;
+    this.srcLen = srcLen;
     this.dstKey = dstKey;
     this.copyFileContext = copyFileContext;
   }
@@ -47,7 +50,7 @@ public class AliyunOSSCopyFileTask implements Runnable {
   public void run() {
     boolean fail = false;
     try {
-      store.copyFile(srcKey, dstKey);
+      store.copyFile(srcKey, srcLen, dstKey);
     } catch (Exception e) {
       LOG.warn("Exception thrown when copy from "
           + srcKey + " to " + dstKey +  ", exception: " + e);

+ 6 - 4
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java

@@ -653,7 +653,7 @@ public class AliyunOSSFileSystem extends FileSystem {
     if (srcStatus.isDirectory()) {
       copyDirectory(srcPath, dstPath);
     } else {
-      copyFile(srcPath, dstPath);
+      copyFile(srcPath, srcStatus.getLen(), dstPath);
     }
 
     return srcPath.equals(dstPath) || delete(srcPath, true);
@@ -664,13 +664,14 @@ public class AliyunOSSFileSystem extends FileSystem {
    * (the caller should make sure srcPath is a file and dstPath is valid)
    *
    * @param srcPath source path.
+   * @param srcLen source path length if it is a file.
    * @param dstPath destination path.
    * @return true if file is successfully copied.
    */
-  private boolean copyFile(Path srcPath, Path dstPath) {
+  private boolean copyFile(Path srcPath, long srcLen, Path dstPath) {
     String srcKey = pathToKey(srcPath);
     String dstKey = pathToKey(dstPath);
-    return store.copyFile(srcKey, dstKey);
+    return store.copyFile(srcKey, srcLen, dstKey);
   }
 
   /**
@@ -709,7 +710,8 @@ public class AliyunOSSFileSystem extends FileSystem {
 
         //copy operation just copies metadata, oss will support shallow copy
         executorService.execute(new AliyunOSSCopyFileTask(
-            store, objectSummary.getKey(), newKey, copyFileContext));
+            store, objectSummary.getKey(),
+            objectSummary.getSize(), newKey, copyFileContext));
         copiesToFinish++;
         // No need to call lock() here.
         // It's ok to copy one more file if the rename operation failed

+ 10 - 21
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java

@@ -87,7 +87,6 @@ public class AliyunOSSFileSystemStore {
   private OSSClient ossClient;
   private String bucketName;
   private long uploadPartSize;
-  private long multipartThreshold;
   private int maxKeys;
   private String serverSideEncryptionAlgorithm;
 
@@ -155,21 +154,10 @@ public class AliyunOSSFileSystemStore {
     ossClient = new OSSClient(endPoint, provider, clientConf);
     uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
         MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
-    multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
-        MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
+
     serverSideEncryptionAlgorithm =
         conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
 
-    if (multipartThreshold < 5 * 1024 * 1024) {
-      LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
-      multipartThreshold = 5 * 1024 * 1024;
-    }
-
-    if (multipartThreshold > 1024 * 1024 * 1024) {
-      LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB");
-      multipartThreshold = 1024 * 1024 * 1024;
-    }
-
     bucketName = uri.getHost();
 
     String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
@@ -305,18 +293,19 @@ public class AliyunOSSFileSystemStore {
    * Copy an object from source key to destination key.
    *
    * @param srcKey source key.
+   * @param srcLen source file length.
    * @param dstKey destination key.
    * @return true if file is successfully copied.
    */
-  public boolean copyFile(String srcKey, String dstKey) {
-    ObjectMetadata objectMeta =
-        ossClient.getObjectMetadata(bucketName, srcKey);
-    statistics.incrementReadOps(1);
-    long contentLength = objectMeta.getContentLength();
-    if (contentLength <= multipartThreshold) {
+  public boolean copyFile(String srcKey, long srcLen, String dstKey) {
+    try {
+      //1, try single copy first
       return singleCopy(srcKey, dstKey);
-    } else {
-      return multipartCopy(srcKey, contentLength, dstKey);
+    } catch (Exception e) {
+      //2, if failed(shallow copy not supported), then multi part copy
+      LOG.debug("Exception thrown when copy file: " + srcKey
+          + ", exception: " + e + ", use multipartCopy instead");
+      return multipartCopy(srcKey, srcLen, dstKey);
     }
   }
 

+ 3 - 1
hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md

@@ -282,7 +282,9 @@ please raise your issues with them.
     <property>
       <name>fs.oss.multipart.upload.threshold</name>
       <value>20971520</value>
-      <description>Minimum size in bytes before we start a multipart uploads or copy.</description>
+      <description>Minimum size in bytes before we start a multipart uploads or copy.
+        Notice: This property is deprecated and will be removed in further version.
+      </description>
     </property>
 
     <property>

+ 0 - 1
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java

@@ -49,7 +49,6 @@ public class TestAliyunOSSBlockOutputStream {
   @Before
   public void setUp() throws Exception {
     Configuration conf = new Configuration();
-    conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024);
     conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
     conf.setInt(IO_CHUNK_BUFFER_SIZE,
         conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0));

+ 7 - 7
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java

@@ -177,13 +177,13 @@ public class TestAliyunOSSFileSystemContract
     AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
     store.storeEmptyFile("test/new/file/");
     AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
-        store, srcOne.toUri().getPath().substring(1),
+        store, srcOne.toUri().getPath().substring(1), data.length,
         dstOne.toUri().getPath().substring(1), copyFileContext);
     oneCopyFileTask.run();
     assumeFalse(copyFileContext.isCopyFailure());
 
     AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
-        store, srcOne.toUri().getPath().substring(1),
+        store, srcOne.toUri().getPath().substring(1), data.length,
         dstTwo.toUri().getPath().substring(1), copyFileContext);
     twoCopyFileTask.run();
     assumeFalse(copyFileContext.isCopyFailure());
@@ -211,13 +211,13 @@ public class TestAliyunOSSFileSystemContract
     AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
     //store.storeEmptyFile("test/new/file/");
     AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
-        store, srcOne.toUri().getPath().substring(1),
+        store, srcOne.toUri().getPath().substring(1), data.length,
         dstOne.toUri().getPath().substring(1), copyFileContext);
     oneCopyFileTask.run();
     assumeTrue(copyFileContext.isCopyFailure());
 
     AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
-        store, srcOne.toUri().getPath().substring(1),
+        store, srcOne.toUri().getPath().substring(1), data.length,
         dstTwo.toUri().getPath().substring(1), copyFileContext);
     twoCopyFileTask.run();
     assumeTrue(copyFileContext.isCopyFailure());
@@ -246,19 +246,19 @@ public class TestAliyunOSSFileSystemContract
     AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore();
     //store.storeEmptyFile("test/new/file/");
     AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask(
-        store, srcOne.toUri().getPath().substring(1),
+        store, srcOne.toUri().getPath().substring(1), data.length,
         dstOne.toUri().getPath().substring(1), copyFileContext);
     oneCopyFileTask.run();
     assumeTrue(copyFileContext.isCopyFailure());
 
     AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask(
-        store, srcOne.toUri().getPath().substring(1),
+        store, srcOne.toUri().getPath().substring(1), data.length,
         dstTwo.toUri().getPath().substring(1), copyFileContext);
     twoCopyFileTask.run();
     assumeTrue(copyFileContext.isCopyFailure());
 
     AliyunOSSCopyFileTask threeCopyFileTask = new AliyunOSSCopyFileTask(
-        store, srcOne.toUri().getPath().substring(1),
+        store, srcOne.toUri().getPath().substring(1), data.length,
         dstThree.toUri().getPath().substring(1), copyFileContext);
     threeCopyFileTask.run();
     assumeTrue(copyFileContext.isCopyFailure());

+ 5 - 5
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java

@@ -78,8 +78,6 @@ public class TestAliyunOSSFileSystemStore {
 
   protected void writeRenameReadCompare(Path path, long len)
       throws IOException, NoSuchAlgorithmException {
-    // If len > fs.oss.multipart.upload.threshold,
-    // we'll use a multipart upload copy
     MessageDigest digest = MessageDigest.getInstance("MD5");
     OutputStream out = new BufferedOutputStream(
         new DigestOutputStream(fs.create(path, false), digest));
@@ -92,10 +90,12 @@ public class TestAliyunOSSFileSystemStore {
     assertTrue("Exists", fs.exists(path));
 
     Path copyPath = path.suffix(".copy");
+    long start = System.currentTimeMillis();
     fs.rename(path, copyPath);
 
     assertTrue("Copy exists", fs.exists(copyPath));
-
+    // should less than 1 second
+    assertTrue(System.currentTimeMillis() - start < 1000);
     // Download file from Aliyun OSS and compare the digest against the original
     MessageDigest digest2 = MessageDigest.getInstance("MD5");
     InputStream in = new BufferedInputStream(
@@ -119,7 +119,7 @@ public class TestAliyunOSSFileSystemStore {
   @Test
   public void testLargeUpload()
       throws IOException, NoSuchAlgorithmException {
-    // Multipart upload, multipart copy
-    writeRenameReadCompare(new Path("/test/xlarge"), 52428800L); // 50MB byte
+    // Multipart upload, shallow copy
+    writeRenameReadCompare(new Path("/test/xlarge"), 2147483648L); // 2GB
   }
 }

+ 0 - 1
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java

@@ -32,7 +32,6 @@ public class TestAliyunOSSContractDistCp extends AbstractContractDistCpTest {
   @Override
   protected Configuration createConfiguration() {
     Configuration newConf = super.createConfiguration();
-    newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING);
     newConf.setLong(MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_SETTING);
     return newConf;
   }