浏览代码

HADOOP-13768. AliyunOSS: handle the failure in the batch delete operation `deleteDirs`. Contributed by Genmao Yu

Kai Zheng 8 年之前
父节点
当前提交
5b151290ae

+ 38 - 9
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java

@@ -29,6 +29,7 @@ import com.aliyun.oss.model.CompleteMultipartUploadRequest;
 import com.aliyun.oss.model.CompleteMultipartUploadResult;
 import com.aliyun.oss.model.CompleteMultipartUploadResult;
 import com.aliyun.oss.model.CopyObjectResult;
 import com.aliyun.oss.model.CopyObjectResult;
 import com.aliyun.oss.model.DeleteObjectsRequest;
 import com.aliyun.oss.model.DeleteObjectsRequest;
+import com.aliyun.oss.model.DeleteObjectsResult;
 import com.aliyun.oss.model.GetObjectRequest;
 import com.aliyun.oss.model.GetObjectRequest;
 import com.aliyun.oss.model.InitiateMultipartUploadRequest;
 import com.aliyun.oss.model.InitiateMultipartUploadRequest;
 import com.aliyun.oss.model.InitiateMultipartUploadResult;
 import com.aliyun.oss.model.InitiateMultipartUploadResult;
@@ -183,14 +184,40 @@ public class AliyunOSSFileSystemStore {
    * Delete a list of keys, and update write operation statistics.
    * Delete a list of keys, and update write operation statistics.
    *
    *
    * @param keysToDelete collection of keys to delete.
    * @param keysToDelete collection of keys to delete.
+   * @throws IOException if failed to delete objects.
    */
    */
-  public void deleteObjects(List<String> keysToDelete) {
-    if (CollectionUtils.isNotEmpty(keysToDelete)) {
-      DeleteObjectsRequest deleteRequest =
-          new DeleteObjectsRequest(bucketName);
-      deleteRequest.setKeys(keysToDelete);
-      ossClient.deleteObjects(deleteRequest);
-      statistics.incrementWriteOps(keysToDelete.size());
+  public void deleteObjects(List<String> keysToDelete) throws IOException {
+    if (CollectionUtils.isEmpty(keysToDelete)) {
+      LOG.warn("Keys to delete is empty.");
+      return;
+    }
+
+    int retry = 10;
+    int tries = 0;
+    List<String> deleteFailed = keysToDelete;
+    while(CollectionUtils.isNotEmpty(deleteFailed)) {
+      DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucketName);
+      deleteRequest.setKeys(deleteFailed);
+      // There are two modes to do batch delete:
+      // 1. detail mode: DeleteObjectsResult.getDeletedObjects returns objects
+      // which were deleted successfully.
+      // 2. simple mode: DeleteObjectsResult.getDeletedObjects returns objects
+      // which were deleted unsuccessfully.
+      // Here, we choose the simple mode to do batch delete.
+      deleteRequest.setQuiet(true);
+      DeleteObjectsResult result = ossClient.deleteObjects(deleteRequest);
+      deleteFailed = result.getDeletedObjects();
+      tries++;
+      if (tries == retry) {
+        break;
+      }
+    }
+
+    if (tries == retry && CollectionUtils.isNotEmpty(deleteFailed)) {
+      // Most of time, it is impossible to try 10 times, expect the
+      // Aliyun OSS service problems.
+      throw new IOException("Failed to delete Aliyun OSS objects for " +
+          tries + " times.");
     }
     }
   }
   }
 
 
@@ -198,8 +225,9 @@ public class AliyunOSSFileSystemStore {
    * Delete a directory from Aliyun OSS.
    * Delete a directory from Aliyun OSS.
    *
    *
    * @param key directory key to delete.
    * @param key directory key to delete.
+   * @throws IOException if failed to delete directory.
    */
    */
-  public void deleteDirs(String key) {
+  public void deleteDirs(String key) throws IOException {
     key = AliyunOSSUtils.maybeAddTrailingSlash(key);
     key = AliyunOSSUtils.maybeAddTrailingSlash(key);
     ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
     ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
     listRequest.setPrefix(key);
     listRequest.setPrefix(key);
@@ -496,8 +524,9 @@ public class AliyunOSSFileSystemStore {
    * Clean up all objects matching the prefix.
    * Clean up all objects matching the prefix.
    *
    *
    * @param prefix Aliyun OSS object prefix.
    * @param prefix Aliyun OSS object prefix.
+   * @throws IOException if failed to clean up objects.
    */
    */
-  public void purge(String prefix) {
+  public void purge(String prefix) throws IOException {
     String key;
     String key;
     try {
     try {
       ObjectListing objects = listObjects(prefix, maxKeys, null, true);
       ObjectListing objects = listObjects(prefix, maxKeys, null, true);

+ 5 - 0
hadoop-tools/hadoop-aliyun/src/test/resources/contract/aliyun-oss.xml

@@ -112,4 +112,9 @@
         <name>fs.oss.multipart.download.size</name>
         <name>fs.oss.multipart.download.size</name>
         <value>102400</value>
         <value>102400</value>
     </property>
     </property>
+
+    <property>
+        <name>fs.contract.create-visibility-delayed</name>
+        <value>true</value>
+    </property>
 </configuration>
 </configuration>