|
@@ -87,7 +87,7 @@ public class S3AFileSystem extends FileSystem {
|
|
private long partSize;
|
|
private long partSize;
|
|
private TransferManager transfers;
|
|
private TransferManager transfers;
|
|
private ThreadPoolExecutor threadPoolExecutor;
|
|
private ThreadPoolExecutor threadPoolExecutor;
|
|
- private int multiPartThreshold;
|
|
|
|
|
|
+ private long multiPartThreshold;
|
|
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
|
|
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
|
|
private CannedAccessControlList cannedACL;
|
|
private CannedAccessControlList cannedACL;
|
|
private String serverSideEncryptionAlgorithm;
|
|
private String serverSideEncryptionAlgorithm;
|
|
@@ -191,8 +191,12 @@ public class S3AFileSystem extends FileSystem {
|
|
DEFAULT_ESTABLISH_TIMEOUT));
|
|
DEFAULT_ESTABLISH_TIMEOUT));
|
|
awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
|
|
awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
|
|
DEFAULT_SOCKET_TIMEOUT));
|
|
DEFAULT_SOCKET_TIMEOUT));
|
|
|
|
+ String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
|
|
|
|
+ if(!signerOverride.isEmpty()) {
|
|
|
|
+ awsConf.setSignerOverride(signerOverride);
|
|
|
|
+ }
|
|
|
|
|
|
- String proxyHost = conf.getTrimmed(PROXY_HOST,"");
|
|
|
|
|
|
+ String proxyHost = conf.getTrimmed(PROXY_HOST, "");
|
|
int proxyPort = conf.getInt(PROXY_PORT, -1);
|
|
int proxyPort = conf.getInt(PROXY_PORT, -1);
|
|
if (!proxyHost.isEmpty()) {
|
|
if (!proxyHost.isEmpty()) {
|
|
awsConf.setProxyHost(proxyHost);
|
|
awsConf.setProxyHost(proxyHost);
|
|
@@ -246,7 +250,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
|
|
|
maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
|
|
maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
|
|
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
|
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
|
- multiPartThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD,
|
|
|
|
|
|
+ multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
|
|
DEFAULT_MIN_MULTIPART_THRESHOLD);
|
|
DEFAULT_MIN_MULTIPART_THRESHOLD);
|
|
|
|
|
|
if (partSize < 5 * 1024 * 1024) {
|
|
if (partSize < 5 * 1024 * 1024) {
|
|
@@ -403,7 +407,7 @@ public class S3AFileSystem extends FileSystem {
|
|
if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
|
|
if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
|
|
return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket,
|
|
return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket,
|
|
key, progress, statistics, cannedACL,
|
|
key, progress, statistics, cannedACL,
|
|
- serverSideEncryptionAlgorithm, partSize, (long)multiPartThreshold,
|
|
|
|
|
|
+ serverSideEncryptionAlgorithm, partSize, multiPartThreshold,
|
|
threadPoolExecutor), statistics);
|
|
threadPoolExecutor), statistics);
|
|
}
|
|
}
|
|
// We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
|
|
// We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
|
|
@@ -1027,7 +1031,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
|
|
|
final ObjectMetadata om = new ObjectMetadata();
|
|
final ObjectMetadata om = new ObjectMetadata();
|
|
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
|
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
|
- om.setServerSideEncryption(serverSideEncryptionAlgorithm);
|
|
|
|
|
|
+ om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
|
|
}
|
|
}
|
|
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile);
|
|
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile);
|
|
putObjectRequest.setCannedAcl(cannedACL);
|
|
putObjectRequest.setCannedAcl(cannedACL);
|
|
@@ -1035,8 +1039,8 @@ public class S3AFileSystem extends FileSystem {
|
|
|
|
|
|
ProgressListener progressListener = new ProgressListener() {
|
|
ProgressListener progressListener = new ProgressListener() {
|
|
public void progressChanged(ProgressEvent progressEvent) {
|
|
public void progressChanged(ProgressEvent progressEvent) {
|
|
- switch (progressEvent.getEventCode()) {
|
|
|
|
- case ProgressEvent.PART_COMPLETED_EVENT_CODE:
|
|
|
|
|
|
+ switch (progressEvent.getEventType()) {
|
|
|
|
+ case TRANSFER_PART_COMPLETED_EVENT:
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
break;
|
|
break;
|
|
default:
|
|
default:
|
|
@@ -1091,7 +1095,7 @@ public class S3AFileSystem extends FileSystem {
|
|
ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
|
|
ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
|
|
final ObjectMetadata dstom = srcom.clone();
|
|
final ObjectMetadata dstom = srcom.clone();
|
|
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
|
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
|
- dstom.setServerSideEncryption(serverSideEncryptionAlgorithm);
|
|
|
|
|
|
+ dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
|
|
}
|
|
}
|
|
CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
|
|
CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
|
|
copyObjectRequest.setCannedAccessControlList(cannedACL);
|
|
copyObjectRequest.setCannedAccessControlList(cannedACL);
|
|
@@ -1099,8 +1103,8 @@ public class S3AFileSystem extends FileSystem {
|
|
|
|
|
|
ProgressListener progressListener = new ProgressListener() {
|
|
ProgressListener progressListener = new ProgressListener() {
|
|
public void progressChanged(ProgressEvent progressEvent) {
|
|
public void progressChanged(ProgressEvent progressEvent) {
|
|
- switch (progressEvent.getEventCode()) {
|
|
|
|
- case ProgressEvent.PART_COMPLETED_EVENT_CODE:
|
|
|
|
|
|
+ switch (progressEvent.getEventType()) {
|
|
|
|
+ case TRANSFER_PART_COMPLETED_EVENT:
|
|
statistics.incrementWriteOps(1);
|
|
statistics.incrementWriteOps(1);
|
|
break;
|
|
break;
|
|
default:
|
|
default:
|
|
@@ -1187,7 +1191,7 @@ public class S3AFileSystem extends FileSystem {
|
|
final ObjectMetadata om = new ObjectMetadata();
|
|
final ObjectMetadata om = new ObjectMetadata();
|
|
om.setContentLength(0L);
|
|
om.setContentLength(0L);
|
|
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
|
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
|
|
- om.setServerSideEncryption(serverSideEncryptionAlgorithm);
|
|
|
|
|
|
+ om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
|
|
}
|
|
}
|
|
PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, im, om);
|
|
PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, im, om);
|
|
putObjectRequest.setCannedAcl(cannedACL);
|
|
putObjectRequest.setCannedAcl(cannedACL);
|