|
@@ -15,6 +15,7 @@
|
|
|
* See the License for the specific language governing permissions and
|
|
|
* limitations under the License.
|
|
|
*/
|
|
|
+
|
|
|
package org.apache.hadoop.fs.aliyun.oss;
|
|
|
|
|
|
import com.aliyun.oss.ClientConfiguration;
|
|
@@ -62,8 +63,11 @@ import java.io.File;
|
|
|
import java.io.FileInputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
+import java.io.Serializable;
|
|
|
import java.net.URI;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Comparator;
|
|
|
import java.util.List;
|
|
|
import java.util.ListIterator;
|
|
|
import java.util.NoSuchElementException;
|
|
@@ -83,7 +87,6 @@ public class AliyunOSSFileSystemStore {
|
|
|
private String bucketName;
|
|
|
private long uploadPartSize;
|
|
|
private long multipartThreshold;
|
|
|
- private long partSize;
|
|
|
private int maxKeys;
|
|
|
private String serverSideEncryptionAlgorithm;
|
|
|
|
|
@@ -143,28 +146,18 @@ public class AliyunOSSFileSystemStore {
|
|
|
String endPoint = conf.getTrimmed(ENDPOINT_KEY, "");
|
|
|
if (StringUtils.isEmpty(endPoint)) {
|
|
|
throw new IllegalArgumentException("Aliyun OSS endpoint should not be " +
|
|
|
- "null or empty. Please set proper endpoint with 'fs.oss.endpoint'.");
|
|
|
+ "null or empty. Please set proper endpoint with 'fs.oss.endpoint'.");
|
|
|
}
|
|
|
CredentialsProvider provider =
|
|
|
AliyunOSSUtils.getCredentialsProvider(conf);
|
|
|
ossClient = new OSSClient(endPoint, provider, clientConf);
|
|
|
- uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
|
|
|
- MULTIPART_UPLOAD_SIZE_DEFAULT);
|
|
|
+ uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
|
|
|
+ MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
|
|
|
multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
|
|
|
MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
|
|
|
- partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
|
|
|
- MULTIPART_UPLOAD_SIZE_DEFAULT);
|
|
|
- if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) {
|
|
|
- partSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
|
|
|
- }
|
|
|
serverSideEncryptionAlgorithm =
|
|
|
conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
|
|
|
|
|
|
- if (uploadPartSize < 5 * 1024 * 1024) {
|
|
|
- LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB");
|
|
|
- uploadPartSize = 5 * 1024 * 1024;
|
|
|
- }
|
|
|
-
|
|
|
if (multipartThreshold < 5 * 1024 * 1024) {
|
|
|
LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
|
|
|
multipartThreshold = 5 * 1024 * 1024;
|
|
@@ -419,71 +412,6 @@ public class AliyunOSSFileSystemStore {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Upload a file as an OSS object, using multipart upload.
|
|
|
- *
|
|
|
- * @param key object key.
|
|
|
- * @param file local file to upload.
|
|
|
- * @throws IOException if failed to upload object.
|
|
|
- */
|
|
|
- public void multipartUploadObject(String key, File file) throws IOException {
|
|
|
- File object = file.getAbsoluteFile();
|
|
|
- long dataLen = object.length();
|
|
|
- long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize);
|
|
|
- int partNum = (int) (dataLen / realPartSize);
|
|
|
- if (dataLen % realPartSize != 0) {
|
|
|
- partNum += 1;
|
|
|
- }
|
|
|
-
|
|
|
- InitiateMultipartUploadRequest initiateMultipartUploadRequest =
|
|
|
- new InitiateMultipartUploadRequest(bucketName, key);
|
|
|
- ObjectMetadata meta = new ObjectMetadata();
|
|
|
- if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
|
|
|
- meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
|
|
|
- }
|
|
|
- initiateMultipartUploadRequest.setObjectMetadata(meta);
|
|
|
- InitiateMultipartUploadResult initiateMultipartUploadResult =
|
|
|
- ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
|
|
|
- List<PartETag> partETags = new ArrayList<PartETag>();
|
|
|
- String uploadId = initiateMultipartUploadResult.getUploadId();
|
|
|
-
|
|
|
- try {
|
|
|
- for (int i = 0; i < partNum; i++) {
|
|
|
- //TODO Optimize this, avoid opening the object multiple times.
|
|
|
- FileInputStream fis = new FileInputStream(object);
|
|
|
- try {
|
|
|
- long skipBytes = realPartSize * i;
|
|
|
- AliyunOSSUtils.skipFully(fis, skipBytes);
|
|
|
- long size = (realPartSize < dataLen - skipBytes) ?
|
|
|
- realPartSize : dataLen - skipBytes;
|
|
|
- UploadPartRequest uploadPartRequest = new UploadPartRequest();
|
|
|
- uploadPartRequest.setBucketName(bucketName);
|
|
|
- uploadPartRequest.setKey(key);
|
|
|
- uploadPartRequest.setUploadId(uploadId);
|
|
|
- uploadPartRequest.setInputStream(fis);
|
|
|
- uploadPartRequest.setPartSize(size);
|
|
|
- uploadPartRequest.setPartNumber(i + 1);
|
|
|
- UploadPartResult uploadPartResult =
|
|
|
- ossClient.uploadPart(uploadPartRequest);
|
|
|
- statistics.incrementWriteOps(1);
|
|
|
- partETags.add(uploadPartResult.getPartETag());
|
|
|
- } finally {
|
|
|
- fis.close();
|
|
|
- }
|
|
|
- }
|
|
|
- CompleteMultipartUploadRequest completeMultipartUploadRequest =
|
|
|
- new CompleteMultipartUploadRequest(bucketName, key,
|
|
|
- uploadId, partETags);
|
|
|
- CompleteMultipartUploadResult completeMultipartUploadResult =
|
|
|
- ossClient.completeMultipartUpload(completeMultipartUploadRequest);
|
|
|
- LOG.debug(completeMultipartUploadResult.getETag());
|
|
|
- } catch (OSSException | ClientException e) {
|
|
|
- AbortMultipartUploadRequest abortMultipartUploadRequest =
|
|
|
- new AbortMultipartUploadRequest(bucketName, key, uploadId);
|
|
|
- ossClient.abortMultipartUpload(abortMultipartUploadRequest);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* list objects.
|
|
|
*
|
|
@@ -494,7 +422,7 @@ public class AliyunOSSFileSystemStore {
|
|
|
* @return a list of matches.
|
|
|
*/
|
|
|
public ObjectListing listObjects(String prefix, int maxListingLength,
|
|
|
- String marker, boolean recursive) {
|
|
|
+ String marker, boolean recursive) {
|
|
|
String delimiter = recursive ? null : "/";
|
|
|
prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix);
|
|
|
ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
|
|
@@ -657,4 +585,83 @@ public class AliyunOSSFileSystemStore {
|
|
|
}
|
|
|
};
|
|
|
}
|
|
|
+
|
|
|
+ public PartETag uploadPart(File file, String key, String uploadId, int idx)
|
|
|
+ throws IOException {
|
|
|
+ InputStream instream = null;
|
|
|
+ Exception caught = null;
|
|
|
+ int tries = 3;
|
|
|
+ while (tries > 0) {
|
|
|
+ try {
|
|
|
+ instream = new FileInputStream(file);
|
|
|
+ UploadPartRequest uploadRequest = new UploadPartRequest();
|
|
|
+ uploadRequest.setBucketName(bucketName);
|
|
|
+ uploadRequest.setKey(key);
|
|
|
+ uploadRequest.setUploadId(uploadId);
|
|
|
+ uploadRequest.setInputStream(instream);
|
|
|
+ uploadRequest.setPartSize(file.length());
|
|
|
+ uploadRequest.setPartNumber(idx);
|
|
|
+ UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest);
|
|
|
+ return uploadResult.getPartETag();
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.debug("Failed to upload "+ file.getPath() +", " +
|
|
|
+ "try again.", e);
|
|
|
+ caught = e;
|
|
|
+ } finally {
|
|
|
+ if (instream != null) {
|
|
|
+ instream.close();
|
|
|
+ instream = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ tries--;
|
|
|
+ }
|
|
|
+
|
|
|
+ assert (caught != null);
|
|
|
+ throw new IOException("Failed to upload " + file.getPath() +
|
|
|
+ " for 3 times.", caught);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Initiate multipart upload.
|
|
|
+ */
|
|
|
+ public String getUploadId(String key) {
|
|
|
+ InitiateMultipartUploadRequest initiateMultipartUploadRequest =
|
|
|
+ new InitiateMultipartUploadRequest(bucketName, key);
|
|
|
+ InitiateMultipartUploadResult initiateMultipartUploadResult =
|
|
|
+ ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
|
|
|
+ return initiateMultipartUploadResult.getUploadId();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Complete the specific multipart upload.
|
|
|
+ */
|
|
|
+ public CompleteMultipartUploadResult completeMultipartUpload(String key,
|
|
|
+ String uploadId, List<PartETag> partETags) {
|
|
|
+ Collections.sort(partETags, new PartNumberAscendComparator());
|
|
|
+ CompleteMultipartUploadRequest completeMultipartUploadRequest =
|
|
|
+ new CompleteMultipartUploadRequest(bucketName, key, uploadId,
|
|
|
+ partETags);
|
|
|
+ return ossClient.completeMultipartUpload(completeMultipartUploadRequest);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Abort the specific multipart upload.
|
|
|
+ */
|
|
|
+ public void abortMultipartUpload(String key, String uploadId) {
|
|
|
+ AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(
|
|
|
+ bucketName, key, uploadId);
|
|
|
+ ossClient.abortMultipartUpload(request);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class PartNumberAscendComparator
|
|
|
+ implements Comparator<PartETag>, Serializable {
|
|
|
+ @Override
|
|
|
+ public int compare(PartETag o1, PartETag o2) {
|
|
|
+ if (o1.getPartNumber() > o2.getPartNumber()) {
|
|
|
+ return 1;
|
|
|
+ } else {
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|