|
@@ -28,6 +28,9 @@ import java.io.FileNotFoundException;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.InputStream;
|
|
import java.io.InputStream;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
+import java.util.Collections;
|
|
|
|
+import java.util.List;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -41,10 +44,13 @@ import org.jets3t.service.S3ServiceException;
|
|
import org.jets3t.service.ServiceException;
|
|
import org.jets3t.service.ServiceException;
|
|
import org.jets3t.service.StorageObjectsChunk;
|
|
import org.jets3t.service.StorageObjectsChunk;
|
|
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
|
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
|
|
|
+import org.jets3t.service.model.MultipartPart;
|
|
|
|
+import org.jets3t.service.model.MultipartUpload;
|
|
import org.jets3t.service.model.S3Bucket;
|
|
import org.jets3t.service.model.S3Bucket;
|
|
import org.jets3t.service.model.S3Object;
|
|
import org.jets3t.service.model.S3Object;
|
|
import org.jets3t.service.model.StorageObject;
|
|
import org.jets3t.service.model.StorageObject;
|
|
import org.jets3t.service.security.AWSCredentials;
|
|
import org.jets3t.service.security.AWSCredentials;
|
|
|
|
+import org.jets3t.service.utils.MultipartUtils;
|
|
|
|
|
|
@InterfaceAudience.Private
|
|
@InterfaceAudience.Private
|
|
@InterfaceStability.Unstable
|
|
@InterfaceStability.Unstable
|
|
@@ -52,6 +58,12 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
|
|
|
|
|
private S3Service s3Service;
|
|
private S3Service s3Service;
|
|
private S3Bucket bucket;
|
|
private S3Bucket bucket;
|
|
|
|
+
|
|
|
|
+ private long multipartBlockSize;
|
|
|
|
+ private boolean multipartEnabled;
|
|
|
|
+ private long multipartCopyBlockSize;
|
|
|
|
+ static final long MAX_PART_SIZE = (long)5 * 1024 * 1024 * 1024;
|
|
|
|
+
|
|
public static final Log LOG =
|
|
public static final Log LOG =
|
|
LogFactory.getLog(Jets3tNativeFileSystemStore.class);
|
|
LogFactory.getLog(Jets3tNativeFileSystemStore.class);
|
|
|
|
|
|
@@ -67,13 +79,27 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
|
} catch (S3ServiceException e) {
|
|
} catch (S3ServiceException e) {
|
|
handleS3ServiceException(e);
|
|
handleS3ServiceException(e);
|
|
}
|
|
}
|
|
|
|
+ multipartEnabled =
|
|
|
|
+ conf.getBoolean("fs.s3n.multipart.uploads.enabled", false);
|
|
|
|
+ multipartBlockSize = Math.min(
|
|
|
|
+ conf.getLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024),
|
|
|
|
+ MAX_PART_SIZE);
|
|
|
|
+ multipartCopyBlockSize = Math.min(
|
|
|
|
+ conf.getLong("fs.s3n.multipart.copy.block.size", MAX_PART_SIZE),
|
|
|
|
+ MAX_PART_SIZE);
|
|
|
|
+
|
|
bucket = new S3Bucket(uri.getHost());
|
|
bucket = new S3Bucket(uri.getHost());
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void storeFile(String key, File file, byte[] md5Hash)
|
|
public void storeFile(String key, File file, byte[] md5Hash)
|
|
throws IOException {
|
|
throws IOException {
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ if (multipartEnabled && file.length() >= multipartBlockSize) {
|
|
|
|
+ storeLargeFile(key, file, md5Hash);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
BufferedInputStream in = null;
|
|
BufferedInputStream in = null;
|
|
try {
|
|
try {
|
|
in = new BufferedInputStream(new FileInputStream(file));
|
|
in = new BufferedInputStream(new FileInputStream(file));
|
|
@@ -98,6 +124,31 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public void storeLargeFile(String key, File file, byte[] md5Hash)
|
|
|
|
+ throws IOException {
|
|
|
|
+ S3Object object = new S3Object(key);
|
|
|
|
+ object.setDataInputFile(file);
|
|
|
|
+ object.setContentType("binary/octet-stream");
|
|
|
|
+ object.setContentLength(file.length());
|
|
|
|
+ if (md5Hash != null) {
|
|
|
|
+ object.setMd5Hash(md5Hash);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ List<StorageObject> objectsToUploadAsMultipart =
|
|
|
|
+ new ArrayList<StorageObject>();
|
|
|
|
+ objectsToUploadAsMultipart.add(object);
|
|
|
|
+ MultipartUtils mpUtils = new MultipartUtils(multipartBlockSize);
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ mpUtils.uploadObjects(bucket.getName(), s3Service,
|
|
|
|
+ objectsToUploadAsMultipart, null);
|
|
|
|
+ } catch (ServiceException e) {
|
|
|
|
+ handleServiceException(e);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ throw new S3Exception(e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public void storeEmptyFile(String key) throws IOException {
|
|
public void storeEmptyFile(String key) throws IOException {
|
|
try {
|
|
try {
|
|
@@ -152,11 +203,8 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
|
}
|
|
}
|
|
S3Object object = s3Service.getObject(bucket.getName(), key);
|
|
S3Object object = s3Service.getObject(bucket.getName(), key);
|
|
return object.getDataInputStream();
|
|
return object.getDataInputStream();
|
|
- } catch (S3ServiceException e) {
|
|
|
|
- handleS3ServiceException(key, e);
|
|
|
|
- return null; //never returned - keep compiler happy
|
|
|
|
} catch (ServiceException e) {
|
|
} catch (ServiceException e) {
|
|
- handleServiceException(e);
|
|
|
|
|
|
+ handleServiceException(key, e);
|
|
return null; //return null if key not found
|
|
return null; //return null if key not found
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -180,11 +228,8 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
|
S3Object object = s3Service.getObject(bucket, key, null, null, null,
|
|
S3Object object = s3Service.getObject(bucket, key, null, null, null,
|
|
null, byteRangeStart, null);
|
|
null, byteRangeStart, null);
|
|
return object.getDataInputStream();
|
|
return object.getDataInputStream();
|
|
- } catch (S3ServiceException e) {
|
|
|
|
- handleS3ServiceException(key, e);
|
|
|
|
- return null; //never returned - keep compiler happy
|
|
|
|
} catch (ServiceException e) {
|
|
} catch (ServiceException e) {
|
|
- handleServiceException(e);
|
|
|
|
|
|
+ handleServiceException(key, e);
|
|
return null; //return null if key not found
|
|
return null; //return null if key not found
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -244,8 +289,16 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
|
LOG.debug("Deleting key:" + key + "from bucket" + bucket.getName());
|
|
LOG.debug("Deleting key:" + key + "from bucket" + bucket.getName());
|
|
}
|
|
}
|
|
s3Service.deleteObject(bucket, key);
|
|
s3Service.deleteObject(bucket, key);
|
|
- } catch (S3ServiceException e) {
|
|
|
|
- handleS3ServiceException(key, e);
|
|
|
|
|
|
+ } catch (ServiceException e) {
|
|
|
|
+ handleServiceException(key, e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void rename(String srcKey, String dstKey) throws IOException {
|
|
|
|
+ try {
|
|
|
|
+ s3Service.renameObject(bucket.getName(), srcKey, new S3Object(dstKey));
|
|
|
|
+ } catch (ServiceException e) {
|
|
|
|
+ handleServiceException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -255,10 +308,52 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
|
if(LOG.isDebugEnabled()) {
|
|
if(LOG.isDebugEnabled()) {
|
|
LOG.debug("Copying srcKey: " + srcKey + "to dstKey: " + dstKey + "in bucket: " + bucket.getName());
|
|
LOG.debug("Copying srcKey: " + srcKey + "to dstKey: " + dstKey + "in bucket: " + bucket.getName());
|
|
}
|
|
}
|
|
|
|
+ if (multipartEnabled) {
|
|
|
|
+ S3Object object = s3Service.getObjectDetails(bucket, srcKey, null,
|
|
|
|
+ null, null, null);
|
|
|
|
+ if (multipartCopyBlockSize > 0 &&
|
|
|
|
+ object.getContentLength() > multipartCopyBlockSize) {
|
|
|
|
+ copyLargeFile(object, dstKey);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(),
|
|
s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(),
|
|
new S3Object(dstKey), false);
|
|
new S3Object(dstKey), false);
|
|
- } catch (S3ServiceException e) {
|
|
|
|
- handleS3ServiceException(srcKey, e);
|
|
|
|
|
|
+ } catch (ServiceException e) {
|
|
|
|
+ handleServiceException(srcKey, e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void copyLargeFile(S3Object srcObject, String dstKey) throws IOException {
|
|
|
|
+ try {
|
|
|
|
+ long partCount = srcObject.getContentLength() / multipartCopyBlockSize +
|
|
|
|
+ (srcObject.getContentLength() % multipartCopyBlockSize > 0 ? 1 : 0);
|
|
|
|
+
|
|
|
|
+ MultipartUpload multipartUpload = s3Service.multipartStartUpload
|
|
|
|
+ (bucket.getName(), dstKey, srcObject.getMetadataMap());
|
|
|
|
+
|
|
|
|
+ List<MultipartPart> listedParts = new ArrayList<MultipartPart>();
|
|
|
|
+ for (int i = 0; i < partCount; i++) {
|
|
|
|
+ long byteRangeStart = i * multipartCopyBlockSize;
|
|
|
|
+ long byteLength;
|
|
|
|
+ if (i < partCount - 1) {
|
|
|
|
+ byteLength = multipartCopyBlockSize;
|
|
|
|
+ } else {
|
|
|
|
+ byteLength = srcObject.getContentLength() % multipartCopyBlockSize;
|
|
|
|
+ if (byteLength == 0) {
|
|
|
|
+ byteLength = multipartCopyBlockSize;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ MultipartPart copiedPart = s3Service.multipartUploadPartCopy
|
|
|
|
+ (multipartUpload, i + 1, bucket.getName(), srcObject.getKey(),
|
|
|
|
+ null, null, null, null, byteRangeStart,
|
|
|
|
+ byteRangeStart + byteLength - 1, null);
|
|
|
|
+ listedParts.add(copiedPart);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ Collections.reverse(listedParts);
|
|
|
|
+ s3Service.multipartCompleteUpload(multipartUpload, listedParts);
|
|
} catch (ServiceException e) {
|
|
} catch (ServiceException e) {
|
|
handleServiceException(e);
|
|
handleServiceException(e);
|
|
}
|
|
}
|
|
@@ -291,11 +386,11 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
|
|
System.out.println(sb);
|
|
System.out.println(sb);
|
|
}
|
|
}
|
|
|
|
|
|
- private void handleS3ServiceException(String key, S3ServiceException e) throws IOException {
|
|
|
|
- if ("NoSuchKey".equals(e.getS3ErrorCode())) {
|
|
|
|
|
|
+ private void handleServiceException(String key, ServiceException e) throws IOException {
|
|
|
|
+ if ("NoSuchKey".equals(e.getErrorCode())) {
|
|
throw new FileNotFoundException("Key '" + key + "' does not exist in S3");
|
|
throw new FileNotFoundException("Key '" + key + "' does not exist in S3");
|
|
} else {
|
|
} else {
|
|
- handleS3ServiceException(e);
|
|
|
|
|
|
+ handleServiceException(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|