|
@@ -38,6 +38,7 @@ import com.amazonaws.auth.AWSCredentialsProviderChain;
|
|
|
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
|
|
|
import com.amazonaws.services.s3.AmazonS3Client;
|
|
|
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
|
|
+import com.amazonaws.services.s3.model.DeleteObjectRequest;
|
|
|
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
|
|
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
|
|
import com.amazonaws.services.s3.model.ObjectListing;
|
|
@@ -82,6 +83,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
private String bucket;
|
|
|
private int maxKeys;
|
|
|
private long partSize;
|
|
|
+ private boolean enableMultiObjectsDelete;
|
|
|
private TransferManager transfers;
|
|
|
private ExecutorService threadPoolExecutor;
|
|
|
private long multiPartThreshold;
|
|
@@ -200,6 +202,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
|
|
multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
|
|
|
DEFAULT_MIN_MULTIPART_THRESHOLD);
|
|
|
+ enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
|
|
|
|
|
|
if (partSize < 5 * 1024 * 1024) {
|
|
|
LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
|
|
@@ -522,11 +525,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
copyFile(summary.getKey(), newDstKey);
|
|
|
|
|
|
if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
|
|
|
- DeleteObjectsRequest deleteRequest =
|
|
|
- new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
|
|
|
- s3.deleteObjects(deleteRequest);
|
|
|
- statistics.incrementWriteOps(1);
|
|
|
- keysToDelete.clear();
|
|
|
+ removeKeys(keysToDelete, true);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -534,11 +533,8 @@ public class S3AFileSystem extends FileSystem {
|
|
|
objects = s3.listNextBatchOfObjects(objects);
|
|
|
statistics.incrementReadOps(1);
|
|
|
} else {
|
|
|
- if (keysToDelete.size() > 0) {
|
|
|
- DeleteObjectsRequest deleteRequest =
|
|
|
- new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
|
|
|
- s3.deleteObjects(deleteRequest);
|
|
|
- statistics.incrementWriteOps(1);
|
|
|
+ if (!keysToDelete.isEmpty()) {
|
|
|
+ removeKeys(keysToDelete, false);
|
|
|
}
|
|
|
break;
|
|
|
}
|
|
@@ -552,6 +548,36 @@ public class S3AFileSystem extends FileSystem {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * A helper method to delete a list of keys on a s3-backend.
|
|
|
+ *
|
|
|
+ * @param keysToDelete collection of keys to delete on the s3-backend
|
|
|
+ * @param clearKeys clears the keysToDelete-list after processing the list
|
|
|
+ * when set to true
|
|
|
+ */
|
|
|
+ private void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
|
|
+ boolean clearKeys) {
|
|
|
+ if (enableMultiObjectsDelete) {
|
|
|
+ DeleteObjectsRequest deleteRequest
|
|
|
+ = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
|
|
|
+ s3.deleteObjects(deleteRequest);
|
|
|
+ statistics.incrementWriteOps(1);
|
|
|
+ } else {
|
|
|
+ int writeops = 0;
|
|
|
+
|
|
|
+ for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
|
|
|
+ s3.deleteObject(
|
|
|
+ new DeleteObjectRequest(bucket, keyVersion.getKey()));
|
|
|
+ writeops++;
|
|
|
+ }
|
|
|
+
|
|
|
+ statistics.incrementWriteOps(writeops);
|
|
|
+ }
|
|
|
+ if (clearKeys) {
|
|
|
+ keysToDelete.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/** Delete a file.
|
|
|
*
|
|
|
* @param f the path to delete.
|
|
@@ -626,11 +652,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
}
|
|
|
|
|
|
if (keys.size() == MAX_ENTRIES_TO_DELETE) {
|
|
|
- DeleteObjectsRequest deleteRequest =
|
|
|
- new DeleteObjectsRequest(bucket).withKeys(keys);
|
|
|
- s3.deleteObjects(deleteRequest);
|
|
|
- statistics.incrementWriteOps(1);
|
|
|
- keys.clear();
|
|
|
+ removeKeys(keys, true);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -639,10 +661,7 @@ public class S3AFileSystem extends FileSystem {
|
|
|
statistics.incrementReadOps(1);
|
|
|
} else {
|
|
|
if (!keys.isEmpty()) {
|
|
|
- DeleteObjectsRequest deleteRequest =
|
|
|
- new DeleteObjectsRequest(bucket).withKeys(keys);
|
|
|
- s3.deleteObjects(deleteRequest);
|
|
|
- statistics.incrementWriteOps(1);
|
|
|
+ removeKeys(keys, false);
|
|
|
}
|
|
|
break;
|
|
|
}
|