|
@@ -0,0 +1,516 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ * <p/>
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ * <p/>
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.fs.aliyun.oss;
|
|
|
+
|
|
|
+import com.aliyun.oss.ClientConfiguration;
|
|
|
+import com.aliyun.oss.ClientException;
|
|
|
+import com.aliyun.oss.OSSClient;
|
|
|
+import com.aliyun.oss.OSSException;
|
|
|
+import com.aliyun.oss.common.auth.CredentialsProvider;
|
|
|
+import com.aliyun.oss.common.comm.Protocol;
|
|
|
+import com.aliyun.oss.model.AbortMultipartUploadRequest;
|
|
|
+import com.aliyun.oss.model.CannedAccessControlList;
|
|
|
+import com.aliyun.oss.model.CompleteMultipartUploadRequest;
|
|
|
+import com.aliyun.oss.model.CompleteMultipartUploadResult;
|
|
|
+import com.aliyun.oss.model.CopyObjectResult;
|
|
|
+import com.aliyun.oss.model.DeleteObjectsRequest;
|
|
|
+import com.aliyun.oss.model.GetObjectRequest;
|
|
|
+import com.aliyun.oss.model.InitiateMultipartUploadRequest;
|
|
|
+import com.aliyun.oss.model.InitiateMultipartUploadResult;
|
|
|
+import com.aliyun.oss.model.ListObjectsRequest;
|
|
|
+import com.aliyun.oss.model.ObjectMetadata;
|
|
|
+import com.aliyun.oss.model.ObjectListing;
|
|
|
+import com.aliyun.oss.model.OSSObjectSummary;
|
|
|
+import com.aliyun.oss.model.PartETag;
|
|
|
+import com.aliyun.oss.model.PutObjectResult;
|
|
|
+import com.aliyun.oss.model.UploadPartCopyRequest;
|
|
|
+import com.aliyun.oss.model.UploadPartCopyResult;
|
|
|
+import com.aliyun.oss.model.UploadPartRequest;
|
|
|
+import com.aliyun.oss.model.UploadPartResult;
|
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import java.io.ByteArrayInputStream;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileInputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.net.URI;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Core implementation of Aliyun OSS Filesystem for Hadoop.
|
|
|
+ * Provides the bridging logic between Hadoop's abstract filesystem and
|
|
|
+ * Aliyun OSS.
|
|
|
+ */
|
|
|
+public class AliyunOSSFileSystemStore {
|
|
|
+ public static final Logger LOG =
|
|
|
+ LoggerFactory.getLogger(AliyunOSSFileSystemStore.class);
|
|
|
+ private FileSystem.Statistics statistics;
|
|
|
+ private OSSClient ossClient;
|
|
|
+ private String bucketName;
|
|
|
+ private long uploadPartSize;
|
|
|
+ private long multipartThreshold;
|
|
|
+ private long partSize;
|
|
|
+ private int maxKeys;
|
|
|
+ private String serverSideEncryptionAlgorithm;
|
|
|
+
|
|
|
+ public void initialize(URI uri, Configuration conf,
|
|
|
+ FileSystem.Statistics stat) throws IOException {
|
|
|
+ statistics = stat;
|
|
|
+ ClientConfiguration clientConf = new ClientConfiguration();
|
|
|
+ clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY,
|
|
|
+ MAXIMUM_CONNECTIONS_DEFAULT));
|
|
|
+ boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY,
|
|
|
+ SECURE_CONNECTIONS_DEFAULT);
|
|
|
+ clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
|
|
|
+ clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY,
|
|
|
+ MAX_ERROR_RETRIES_DEFAULT));
|
|
|
+ clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY,
|
|
|
+ ESTABLISH_TIMEOUT_DEFAULT));
|
|
|
+ clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY,
|
|
|
+ SOCKET_TIMEOUT_DEFAULT));
|
|
|
+
|
|
|
+ String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, "");
|
|
|
+ int proxyPort = conf.getInt(PROXY_PORT_KEY, -1);
|
|
|
+ if (StringUtils.isNotEmpty(proxyHost)) {
|
|
|
+ clientConf.setProxyHost(proxyHost);
|
|
|
+ if (proxyPort >= 0) {
|
|
|
+ clientConf.setProxyPort(proxyPort);
|
|
|
+ } else {
|
|
|
+ if (secureConnections) {
|
|
|
+ LOG.warn("Proxy host set without port. Using HTTPS default 443");
|
|
|
+ clientConf.setProxyPort(443);
|
|
|
+ } else {
|
|
|
+ LOG.warn("Proxy host set without port. Using HTTP default 80");
|
|
|
+ clientConf.setProxyPort(80);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY);
|
|
|
+ String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY);
|
|
|
+ if ((proxyUsername == null) != (proxyPassword == null)) {
|
|
|
+ String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " +
|
|
|
+ PROXY_PASSWORD_KEY + " set without the other.";
|
|
|
+ LOG.error(msg);
|
|
|
+ throw new IllegalArgumentException(msg);
|
|
|
+ }
|
|
|
+ clientConf.setProxyUsername(proxyUsername);
|
|
|
+ clientConf.setProxyPassword(proxyPassword);
|
|
|
+ clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY));
|
|
|
+ clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY));
|
|
|
+ } else if (proxyPort >= 0) {
|
|
|
+ String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " +
|
|
|
+ PROXY_HOST_KEY;
|
|
|
+ LOG.error(msg);
|
|
|
+ throw new IllegalArgumentException(msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ String endPoint = conf.getTrimmed(ENDPOINT_KEY, "");
|
|
|
+ CredentialsProvider provider =
|
|
|
+ AliyunOSSUtils.getCredentialsProvider(conf);
|
|
|
+ ossClient = new OSSClient(endPoint, provider, clientConf);
|
|
|
+ uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
|
|
|
+ MULTIPART_UPLOAD_SIZE_DEFAULT);
|
|
|
+ multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
|
|
|
+ MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
|
|
|
+ partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
|
|
|
+ MULTIPART_UPLOAD_SIZE_DEFAULT);
|
|
|
+ if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) {
|
|
|
+ partSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
|
|
|
+ }
|
|
|
+ serverSideEncryptionAlgorithm =
|
|
|
+ conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
|
|
|
+
|
|
|
+ if (uploadPartSize < 5 * 1024 * 1024) {
|
|
|
+ LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB");
|
|
|
+ uploadPartSize = 5 * 1024 * 1024;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (multipartThreshold < 5 * 1024 * 1024) {
|
|
|
+ LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
|
|
|
+ multipartThreshold = 5 * 1024 * 1024;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (multipartThreshold > 1024 * 1024 * 1024) {
|
|
|
+ LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB");
|
|
|
+ multipartThreshold = 1024 * 1024 * 1024;
|
|
|
+ }
|
|
|
+
|
|
|
+ String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
|
|
|
+ if (StringUtils.isNotEmpty(cannedACLName)) {
|
|
|
+ CannedAccessControlList cannedACL =
|
|
|
+ CannedAccessControlList.valueOf(cannedACLName);
|
|
|
+ ossClient.setBucketAcl(bucketName, cannedACL);
|
|
|
+ }
|
|
|
+
|
|
|
+ maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
|
|
|
+ bucketName = uri.getHost();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Delete an object, and update write operation statistics.
|
|
|
+ *
|
|
|
+ * @param key key to blob to delete.
|
|
|
+ */
|
|
|
+ public void deleteObject(String key) {
|
|
|
+ ossClient.deleteObject(bucketName, key);
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Delete a list of keys, and update write operation statistics.
|
|
|
+ *
|
|
|
+ * @param keysToDelete collection of keys to delete.
|
|
|
+ */
|
|
|
+ public void deleteObjects(List<String> keysToDelete) {
|
|
|
+ if (CollectionUtils.isNotEmpty(keysToDelete)) {
|
|
|
+ DeleteObjectsRequest deleteRequest =
|
|
|
+ new DeleteObjectsRequest(bucketName);
|
|
|
+ deleteRequest.setKeys(keysToDelete);
|
|
|
+ ossClient.deleteObjects(deleteRequest);
|
|
|
+ statistics.incrementWriteOps(keysToDelete.size());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Delete a directory from Aliyun OSS.
|
|
|
+ *
|
|
|
+ * @param key directory key to delete.
|
|
|
+ */
|
|
|
+ public void deleteDirs(String key) {
|
|
|
+ key = AliyunOSSUtils.maybeAddTrailingSlash(key);
|
|
|
+ ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
|
|
|
+ listRequest.setPrefix(key);
|
|
|
+ listRequest.setDelimiter(null);
|
|
|
+ listRequest.setMaxKeys(maxKeys);
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ ObjectListing objects = ossClient.listObjects(listRequest);
|
|
|
+ statistics.incrementReadOps(1);
|
|
|
+ List<String> keysToDelete = new ArrayList<String>();
|
|
|
+ for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
|
|
|
+ keysToDelete.add(objectSummary.getKey());
|
|
|
+ }
|
|
|
+ deleteObjects(keysToDelete);
|
|
|
+ if (objects.isTruncated()) {
|
|
|
+ listRequest.setMarker(objects.getNextMarker());
|
|
|
+ } else {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return metadata of a given object key.
|
|
|
+ *
|
|
|
+ * @param key object key.
|
|
|
+ * @return return null if key does not exist.
|
|
|
+ */
|
|
|
+ public ObjectMetadata getObjectMetadata(String key) {
|
|
|
+ try {
|
|
|
+ return ossClient.getObjectMetadata(bucketName, key);
|
|
|
+ } catch (OSSException osse) {
|
|
|
+ return null;
|
|
|
+ } finally {
|
|
|
+ statistics.incrementReadOps(1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Upload an empty file as an OSS object, using single upload.
|
|
|
+ *
|
|
|
+ * @param key object key.
|
|
|
+ * @throws IOException if failed to upload object.
|
|
|
+ */
|
|
|
+ public void storeEmptyFile(String key) throws IOException {
|
|
|
+ ObjectMetadata dirMeta = new ObjectMetadata();
|
|
|
+ byte[] buffer = new byte[0];
|
|
|
+ ByteArrayInputStream in = new ByteArrayInputStream(buffer);
|
|
|
+ dirMeta.setContentLength(0);
|
|
|
+ try {
|
|
|
+ ossClient.putObject(bucketName, key, in, dirMeta);
|
|
|
+ } finally {
|
|
|
+ in.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Copy an object from source key to destination key.
|
|
|
+ *
|
|
|
+ * @param srcKey source key.
|
|
|
+ * @param dstKey destination key.
|
|
|
+ * @return true if file is successfully copied.
|
|
|
+ */
|
|
|
+ public boolean copyFile(String srcKey, String dstKey) {
|
|
|
+ ObjectMetadata objectMeta =
|
|
|
+ ossClient.getObjectMetadata(bucketName, srcKey);
|
|
|
+ long contentLength = objectMeta.getContentLength();
|
|
|
+ if (contentLength <= multipartThreshold) {
|
|
|
+ return singleCopy(srcKey, dstKey);
|
|
|
+ } else {
|
|
|
+ return multipartCopy(srcKey, contentLength, dstKey);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Use single copy to copy an OSS object.
|
|
|
+ * (The caller should make sure srcPath is a file and dstPath is valid)
|
|
|
+ *
|
|
|
+ * @param srcKey source key.
|
|
|
+ * @param dstKey destination key.
|
|
|
+ * @return true if object is successfully copied.
|
|
|
+ */
|
|
|
+ private boolean singleCopy(String srcKey, String dstKey) {
|
|
|
+ CopyObjectResult copyResult =
|
|
|
+ ossClient.copyObject(bucketName, srcKey, bucketName, dstKey);
|
|
|
+ LOG.debug(copyResult.getETag());
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Use multipart copy to copy an OSS object.
|
|
|
+ * (The caller should make sure srcPath is a file and dstPath is valid)
|
|
|
+ *
|
|
|
+ * @param srcKey source key.
|
|
|
+ * @param contentLength data size of the object to copy.
|
|
|
+ * @param dstKey destination key.
|
|
|
+ * @return true if success, or false if upload is aborted.
|
|
|
+ */
|
|
|
+ private boolean multipartCopy(String srcKey, long contentLength,
|
|
|
+ String dstKey) {
|
|
|
+ long realPartSize =
|
|
|
+ AliyunOSSUtils.calculatePartSize(contentLength, uploadPartSize);
|
|
|
+ int partNum = (int) (contentLength / realPartSize);
|
|
|
+ if (contentLength % realPartSize != 0) {
|
|
|
+ partNum++;
|
|
|
+ }
|
|
|
+ InitiateMultipartUploadRequest initiateMultipartUploadRequest =
|
|
|
+ new InitiateMultipartUploadRequest(bucketName, dstKey);
|
|
|
+ ObjectMetadata meta = new ObjectMetadata();
|
|
|
+ if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
|
|
|
+ meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
|
|
|
+ }
|
|
|
+ initiateMultipartUploadRequest.setObjectMetadata(meta);
|
|
|
+ InitiateMultipartUploadResult initiateMultipartUploadResult =
|
|
|
+ ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
|
|
|
+ String uploadId = initiateMultipartUploadResult.getUploadId();
|
|
|
+ List<PartETag> partETags = new ArrayList<PartETag>();
|
|
|
+ try {
|
|
|
+ for (int i = 0; i < partNum; i++) {
|
|
|
+ long skipBytes = realPartSize * i;
|
|
|
+ long size = (realPartSize < contentLength - skipBytes) ?
|
|
|
+ realPartSize : contentLength - skipBytes;
|
|
|
+ UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest();
|
|
|
+ partCopyRequest.setSourceBucketName(bucketName);
|
|
|
+ partCopyRequest.setSourceKey(srcKey);
|
|
|
+ partCopyRequest.setBucketName(bucketName);
|
|
|
+ partCopyRequest.setKey(dstKey);
|
|
|
+ partCopyRequest.setUploadId(uploadId);
|
|
|
+ partCopyRequest.setPartSize(size);
|
|
|
+ partCopyRequest.setBeginIndex(skipBytes);
|
|
|
+ partCopyRequest.setPartNumber(i + 1);
|
|
|
+ UploadPartCopyResult partCopyResult =
|
|
|
+ ossClient.uploadPartCopy(partCopyRequest);
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
+ partETags.add(partCopyResult.getPartETag());
|
|
|
+ }
|
|
|
+ CompleteMultipartUploadRequest completeMultipartUploadRequest =
|
|
|
+ new CompleteMultipartUploadRequest(bucketName, dstKey,
|
|
|
+ uploadId, partETags);
|
|
|
+ CompleteMultipartUploadResult completeMultipartUploadResult =
|
|
|
+ ossClient.completeMultipartUpload(completeMultipartUploadRequest);
|
|
|
+ LOG.debug(completeMultipartUploadResult.getETag());
|
|
|
+ return true;
|
|
|
+ } catch (OSSException | ClientException e) {
|
|
|
+ AbortMultipartUploadRequest abortMultipartUploadRequest =
|
|
|
+ new AbortMultipartUploadRequest(bucketName, dstKey, uploadId);
|
|
|
+ ossClient.abortMultipartUpload(abortMultipartUploadRequest);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Upload a file as an OSS object, using single upload.
|
|
|
+ *
|
|
|
+ * @param key object key.
|
|
|
+ * @param file local file to upload.
|
|
|
+ * @throws IOException if failed to upload object.
|
|
|
+ */
|
|
|
+ public void uploadObject(String key, File file) throws IOException {
|
|
|
+ File object = file.getAbsoluteFile();
|
|
|
+ FileInputStream fis = new FileInputStream(object);
|
|
|
+ ObjectMetadata meta = new ObjectMetadata();
|
|
|
+ meta.setContentLength(object.length());
|
|
|
+ if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
|
|
|
+ meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
|
|
|
+ LOG.debug(result.getETag());
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
+ } finally {
|
|
|
+ fis.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Upload a file as an OSS object, using multipart upload.
|
|
|
+ *
|
|
|
+ * @param key object key.
|
|
|
+ * @param file local file to upload.
|
|
|
+ * @throws IOException if failed to upload object.
|
|
|
+ */
|
|
|
+ public void multipartUploadObject(String key, File file) throws IOException {
|
|
|
+ File object = file.getAbsoluteFile();
|
|
|
+ long dataLen = object.length();
|
|
|
+ long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize);
|
|
|
+ int partNum = (int) (dataLen / realPartSize);
|
|
|
+ if (dataLen % realPartSize != 0) {
|
|
|
+ partNum += 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ InitiateMultipartUploadRequest initiateMultipartUploadRequest =
|
|
|
+ new InitiateMultipartUploadRequest(bucketName, key);
|
|
|
+ ObjectMetadata meta = new ObjectMetadata();
|
|
|
+ if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
|
|
|
+ meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
|
|
|
+ }
|
|
|
+ initiateMultipartUploadRequest.setObjectMetadata(meta);
|
|
|
+ InitiateMultipartUploadResult initiateMultipartUploadResult =
|
|
|
+ ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
|
|
|
+ List<PartETag> partETags = new ArrayList<PartETag>();
|
|
|
+ String uploadId = initiateMultipartUploadResult.getUploadId();
|
|
|
+
|
|
|
+ try {
|
|
|
+ for (int i = 0; i < partNum; i++) {
|
|
|
+ // TODO: Optimize this, avoid opening the object multiple times
|
|
|
+ FileInputStream fis = new FileInputStream(object);
|
|
|
+ try {
|
|
|
+ long skipBytes = realPartSize * i;
|
|
|
+ AliyunOSSUtils.skipFully(fis, skipBytes);
|
|
|
+ long size = (realPartSize < dataLen - skipBytes) ?
|
|
|
+ realPartSize : dataLen - skipBytes;
|
|
|
+ UploadPartRequest uploadPartRequest = new UploadPartRequest();
|
|
|
+ uploadPartRequest.setBucketName(bucketName);
|
|
|
+ uploadPartRequest.setKey(key);
|
|
|
+ uploadPartRequest.setUploadId(uploadId);
|
|
|
+ uploadPartRequest.setInputStream(fis);
|
|
|
+ uploadPartRequest.setPartSize(size);
|
|
|
+ uploadPartRequest.setPartNumber(i + 1);
|
|
|
+ UploadPartResult uploadPartResult =
|
|
|
+ ossClient.uploadPart(uploadPartRequest);
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
+ partETags.add(uploadPartResult.getPartETag());
|
|
|
+ } finally {
|
|
|
+ fis.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ CompleteMultipartUploadRequest completeMultipartUploadRequest =
|
|
|
+ new CompleteMultipartUploadRequest(bucketName, key,
|
|
|
+ uploadId, partETags);
|
|
|
+ CompleteMultipartUploadResult completeMultipartUploadResult =
|
|
|
+ ossClient.completeMultipartUpload(completeMultipartUploadRequest);
|
|
|
+ LOG.debug(completeMultipartUploadResult.getETag());
|
|
|
+ } catch (OSSException | ClientException e) {
|
|
|
+ AbortMultipartUploadRequest abortMultipartUploadRequest =
|
|
|
+ new AbortMultipartUploadRequest(bucketName, key, uploadId);
|
|
|
+ ossClient.abortMultipartUpload(abortMultipartUploadRequest);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * list objects.
|
|
|
+ *
|
|
|
+ * @param prefix prefix.
|
|
|
+ * @param maxListingLength max no. of entries
|
|
|
+ * @param marker last key in any previous search.
|
|
|
+ * @param recursive whether to list directory recursively.
|
|
|
+ * @return a list of matches.
|
|
|
+ */
|
|
|
+ public ObjectListing listObjects(String prefix, int maxListingLength,
|
|
|
+ String marker, boolean recursive) {
|
|
|
+ String delimiter = recursive ? null : "/";
|
|
|
+ prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix);
|
|
|
+ ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
|
|
|
+ listRequest.setPrefix(prefix);
|
|
|
+ listRequest.setDelimiter(delimiter);
|
|
|
+ listRequest.setMaxKeys(maxListingLength);
|
|
|
+ listRequest.setMarker(marker);
|
|
|
+
|
|
|
+ ObjectListing listing = ossClient.listObjects(listRequest);
|
|
|
+ statistics.incrementReadOps(1);
|
|
|
+ return listing;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Retrieve a part of an object.
|
|
|
+ *
|
|
|
+ * @param key the object name that is being retrieved from the Aliyun OSS.
|
|
|
+ * @param byteStart start position.
|
|
|
+ * @param byteEnd end position.
|
|
|
+ * @return This method returns null if the key is not found.
|
|
|
+ */
|
|
|
+ public InputStream retrieve(String key, long byteStart, long byteEnd) {
|
|
|
+ try {
|
|
|
+ GetObjectRequest request = new GetObjectRequest(bucketName, key);
|
|
|
+ request.setRange(byteStart, byteEnd);
|
|
|
+ return ossClient.getObject(request).getObjectContent();
|
|
|
+ } catch (OSSException | ClientException e) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Close OSS client properly.
|
|
|
+ */
|
|
|
+ public void close() {
|
|
|
+ if (ossClient != null) {
|
|
|
+ ossClient.shutdown();
|
|
|
+ ossClient = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Clean up all objects matching the prefix.
|
|
|
+ *
|
|
|
+ * @param prefix Aliyun OSS object prefix.
|
|
|
+ */
|
|
|
+ public void purge(String prefix) {
|
|
|
+ String key;
|
|
|
+ try {
|
|
|
+ ObjectListing objects = listObjects(prefix, maxKeys, null, true);
|
|
|
+ for (OSSObjectSummary object : objects.getObjectSummaries()) {
|
|
|
+ key = object.getKey();
|
|
|
+ ossClient.deleteObject(bucketName, key);
|
|
|
+ }
|
|
|
+
|
|
|
+ for (String dir: objects.getCommonPrefixes()) {
|
|
|
+ deleteDirs(dir);
|
|
|
+ }
|
|
|
+ } catch (OSSException | ClientException e) {
|
|
|
+ LOG.error("Failed to purge " + prefix);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|