Sfoglia il codice sorgente

HADOOP-13498. The number of multi-part upload part should not bigger than 10000. Contributed by Genmao Yu.

Mingfei 8 anni fa
parent
commit
cdb77110e7

+ 4 - 5
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java

@@ -71,7 +71,6 @@ import org.slf4j.LoggerFactory;
  * Aliyun OSS</a>, used to access OSS blob system in a filesystem style.
  */
 public class AliyunOSSFileSystem extends FileSystem {
-
   private static final Logger LOG =
       LoggerFactory.getLogger(AliyunOSSFileSystem.class);
   private URI uri;
@@ -560,18 +559,18 @@ public class AliyunOSSFileSystem extends FileSystem {
    * Used to create an empty file that represents an empty directory.
    *
    * @param bucket the bucket this directory belongs to
-   * @param objectName directory path
+   * @param key directory path
    * @return true if directory successfully created
    * @throws IOException
    */
-  private boolean mkdir(final String bucket, final String objectName)
+  private boolean mkdir(final String bucket, final String key)
       throws IOException {
-    String dirName = objectName;
+    String dirName = key;
     ObjectMetadata dirMeta = new ObjectMetadata();
     byte[] buffer = new byte[0];
     ByteArrayInputStream in = new ByteArrayInputStream(buffer);
     dirMeta.setContentLength(0);
-    if (!objectName.endsWith("/")) {
+    if (!key.endsWith("/")) {
       dirName += "/";
     }
     try {

+ 12 - 11
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java

@@ -84,6 +84,9 @@ public class AliyunOSSOutputStream extends OutputStream {
 
     partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
         MULTIPART_UPLOAD_SIZE_DEFAULT);
+    if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) {
+      partSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
+    }
     partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
         MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
 
@@ -151,6 +154,12 @@ public class AliyunOSSOutputStream extends OutputStream {
   private void multipartUploadObject() throws IOException {
     File object = tmpFile.getAbsoluteFile();
     long dataLen = object.length();
+    long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize);
+    int partNum = (int)(dataLen / realPartSize);
+    if (dataLen % realPartSize != 0) {
+      partNum += 1;
+    }
+
     InitiateMultipartUploadRequest initiateMultipartUploadRequest =
         new InitiateMultipartUploadRequest(bucketName, key);
     ObjectMetadata meta = new ObjectMetadata();
@@ -161,14 +170,6 @@ public class AliyunOSSOutputStream extends OutputStream {
     initiateMultipartUploadRequest.setObjectMetadata(meta);
     InitiateMultipartUploadResult initiateMultipartUploadResult =
         ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
-    int partNum = (int)(dataLen / partSize);
-    if (dataLen % partSize != 0) {
-      partNum += 1;
-    }
-    if (partNum > MULTIPART_UPLOAD_PART_NUM_LIMIT) {
-      throw new IOException("Number of parts " + partNum + " should not be " +
-          "bigger than limit " + MULTIPART_UPLOAD_PART_NUM_LIMIT);
-    }
     List<PartETag> partETags = new ArrayList<PartETag>();
     String uploadId = initiateMultipartUploadResult.getUploadId();
 
@@ -177,10 +178,10 @@ public class AliyunOSSOutputStream extends OutputStream {
         // TODO: Optimize this, avoid opening the object multiple times
         FileInputStream fis = new FileInputStream(object);
         try {
-          long skipBytes = partSize * i;
+          long skipBytes = realPartSize * i;
           AliyunOSSUtils.skipFully(fis, skipBytes);
-          long size = (partSize < dataLen - skipBytes) ?
-              partSize : dataLen - skipBytes;
+          long size = (realPartSize < dataLen - skipBytes) ?
+              realPartSize : dataLen - skipBytes;
           UploadPartRequest uploadPartRequest = new UploadPartRequest();
           uploadPartRequest.setBucketName(bucketName);
           uploadPartRequest.setKey(key);

+ 15 - 0
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java

@@ -28,6 +28,8 @@ import java.util.Objects;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
+import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT;
+
 /**
  * Utility methods for Aliyun OSS code.
  */
@@ -172,4 +174,17 @@ final public class AliyunOSSUtils {
               "to EOF.");
     }
   }
+
+  /**
+   * Calculate a proper size of multipart piece. If <code>minPartSize</code>
+   * is too small, the number of multipart pieces may exceed the limit of
+   * {@link Constants#MULTIPART_UPLOAD_PART_NUM_LIMIT}.
+   * @param contentLength the size of file.
+   * @param minPartSize the minimum size of multipart piece.
+   * @return a revisional size of multipart piece.
+     */
+  public static long calculatePartSize(long contentLength, long minPartSize) {
+    long tmpPartSize = contentLength / MULTIPART_UPLOAD_PART_NUM_LIMIT + 1;
+    return Math.max(minPartSize, tmpPartSize);
+  }
 }

+ 3 - 1
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java

@@ -79,7 +79,7 @@ public final class Constants {
       "fs.oss.multipart.upload.size";
 
   public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024;
-  public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 1000;
+  public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 10000;
 
   // Minimum size in bytes before we start a multipart uploads or copy
   public static final String MIN_MULTIPART_UPLOAD_THRESHOLD_KEY =
@@ -108,4 +108,6 @@ public final class Constants {
   public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024;
   public static final String FS_OSS = "oss";
 
+  public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L;
+
 }

+ 19 - 0
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSOutputStream.java

@@ -68,4 +68,23 @@ public class TestOSSOutputStream {
   public void testMultiPartUpload() throws IOException {
     ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024);
   }
+
+  @Test
+  public void testMultiPartUploadLimit() throws IOException {
+    long partSize1 = AliyunOSSUtils.calculatePartSize(10 * 1024, 100 * 1024);
+    assert(10 * 1024 / partSize1 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
+
+    long partSize2 = AliyunOSSUtils.calculatePartSize(200 * 1024, 100 * 1024);
+    assert(200 * 1024 / partSize2 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
+
+    long partSize3 = AliyunOSSUtils.calculatePartSize(10000 * 100 * 1024,
+        100 * 1024);
+    assert(10000 * 100 * 1024 / partSize3
+        < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
+
+    long partSize4 = AliyunOSSUtils.calculatePartSize(10001 * 100 * 1024,
+        100 * 1024);
+    assert(10001 * 100 * 1024 / partSize4
+        < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT);
+  }
 }