Ver código fonte

HDDS-930. Multipart Upload: Abort multiupload request. Contributed by Bharat Viswanadham.

Bharat Viswanadham 6 anos atrás
pai
commit
cdfbec47ce

+ 11 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java

@@ -386,6 +386,17 @@ public class OzoneBucket {
         partsMap);
   }
 
+  /**
+   * Abort multipart upload request.
+   * @param keyName
+   * @param uploadID
+   * @throws IOException
+   */
+  public void abortMultipartUpload(String keyName, String uploadID) throws
+      IOException {
+    proxy.abortMultipartUpload(volumeName, name, keyName, uploadID);
+  }
+
   /**
    * An Iterator to iterate over {@link OzoneKey} list.
    */

+ 11 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java

@@ -434,4 +434,15 @@ public interface ClientProtocol {
       String bucketName, String keyName, String uploadID,
       Map<Integer, String> partsMap) throws IOException;
 
+  /**
+   * Abort Multipart upload request for the given key with given uploadID.
+   * @param volumeName
+   * @param bucketName
+   * @param keyName
+   * @param uploadID
+   * @throws IOException
+   */
+  void abortMultipartUpload(String volumeName,
+      String bucketName, String keyName, String uploadID) throws IOException;
+
 }

+ 7 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java

@@ -985,4 +985,11 @@ public class RestClient implements ClientProtocol {
     throw new UnsupportedOperationException("Ozone REST protocol does not " +
         "support this operation.");
   }
+
+  @Override
+  public void abortMultipartUpload(String volumeName,
+       String bucketName, String keyName, String uploadID) throws IOException {
+    throw new UnsupportedOperationException("Ozone REST protocol does not " +
+        "support this operation.");
+  }
 }

+ 15 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -727,6 +727,7 @@ public class RpcClient implements ClientProtocol {
         .setDataSize(size)
         .setIsMultipartKey(true)
         .setMultipartUploadID(uploadID)
+        .setMultipartUploadPartNumber(partNumber)
         .build();
 
     OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
@@ -780,4 +781,18 @@ public class RpcClient implements ClientProtocol {
 
   }
 
+  @Override
+  public void abortMultipartUpload(String volumeName,
+       String bucketName, String keyName, String uploadID) throws IOException {
+    HddsClientUtils.verifyResourceName(volumeName, bucketName);
+    HddsClientUtils.checkNotNull(keyName, uploadID);
+    OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setMultipartUploadID(uploadID)
+        .build();
+    ozoneManagerClient.abortMultipartUpload(omKeyArgs);
+  }
+
 }

+ 7 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java

@@ -338,5 +338,12 @@ public interface OzoneManagerProtocol {
       OmKeyArgs omKeyArgs, OmMultipartUploadList multipartUploadList)
       throws IOException;
 
+  /**
+   * Abort multipart upload.
+   * @param omKeyArgs
+   * @throws IOException
+   */
+  void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException;
+
 }
 

+ 35 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java

@@ -90,6 +90,10 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartInfoInitiateRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartInfoInitiateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
+    MultipartUploadAbortRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartUploadAbortResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartUploadCompleteRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -633,6 +637,10 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
       keyArgs.setMultipartUploadID(args.getMultipartUploadID());
     }
 
+    if (args.getMultipartUploadPartNumber() > 0) {
+      keyArgs.setMultipartNumber(args.getMultipartUploadPartNumber());
+    }
+
     keyArgs.setIsMultipartKey(args.getIsMultipartKey());
 
 
@@ -1053,6 +1061,33 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
     return info;
   }
 
+  @Override
+  public void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException {
+    KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
+        .setVolumeName(omKeyArgs.getVolumeName())
+        .setBucketName(omKeyArgs.getBucketName())
+        .setKeyName(omKeyArgs.getKeyName())
+        .setMultipartUploadID(omKeyArgs.getMultipartUploadID());
+
+    MultipartUploadAbortRequest.Builder multipartUploadAbortRequest =
+        MultipartUploadAbortRequest.newBuilder();
+    multipartUploadAbortRequest.setKeyArgs(keyArgs);
+
+    OMRequest omRequest = createOMRequest(
+        Type.AbortMultiPartUpload)
+        .setAbortMultiPartUploadRequest(multipartUploadAbortRequest.build())
+        .build();
+
+    MultipartUploadAbortResponse response =
+        submitRequest(omRequest).getAbortMultiPartUploadResponse();
+
+    if (response.getStatus() != Status.OK) {
+      throw new IOException("Abort multipart upload failed, error:" +
+          response.getStatus());
+    }
+
+  }
+
   public List<ServiceInfo> getServiceList() throws IOException {
     ServiceListRequest req = ServiceListRequest.newBuilder().build();
 

+ 12 - 0
hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto

@@ -66,6 +66,7 @@ enum Type {
   InitiateMultiPartUpload = 45;
   CommitMultiPartUpload = 46;
   CompleteMultiPartUpload = 47;
+  AbortMultiPartUpload = 48;
 
   ServiceList = 51;
 }
@@ -107,6 +108,7 @@ message OMRequest {
   optional MultipartInfoInitiateRequest     initiateMultiPartUploadRequest = 45;
   optional MultipartCommitUploadPartRequest commitMultiPartUploadRequest   = 46;
   optional MultipartUploadCompleteRequest   completeMultiPartUploadRequest = 47;
+  optional MultipartUploadAbortRequest      abortMultiPartUploadRequest    = 48;
 
   optional ServiceListRequest               serviceListRequest             = 51;
 }
@@ -149,6 +151,7 @@ message OMResponse {
   optional MultipartInfoInitiateResponse   initiateMultiPartUploadResponse = 45;
   optional MultipartCommitUploadPartResponse commitMultiPartUploadResponse = 46;
   optional MultipartUploadCompleteResponse completeMultiPartUploadResponse = 47;
+  optional MultipartUploadAbortResponse    abortMultiPartUploadResponse    = 48;
 
   optional ServiceListResponse               ServiceListResponse           = 51;
 }
@@ -184,6 +187,7 @@ enum Status {
     MISSING_UPLOAD_PARTS = 28;
     COMPLETE_MULTIPART_UPLOAD_ERROR = 29;
     ENTITY_TOO_SMALL = 30;
+    ABORT_MULTIPART_UPLOAD_FAILED = 31;
 }
 
 
@@ -608,6 +612,14 @@ message Part {
     required string partName = 2;
 }
 
+message MultipartUploadAbortRequest {
+    required KeyArgs keyArgs = 1;
+}
+
+message MultipartUploadAbortResponse {
+    required Status status = 1;
+}
+
 /**
  The OM service that takes care of Ozone namespace.
 */

+ 62 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java

@@ -1665,6 +1665,68 @@ public class TestOzoneRpcClient {
     }
   }
 
+  @Test
+  public void testAbortUploadFail() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    try {
+      bucket.abortMultipartUpload(keyName, "random");
+      fail("testAbortUploadFail failed");
+    } catch (IOException ex) {
+      GenericTestUtils.assertExceptionContains(
+          "NO_SUCH_MULTIPART_UPLOAD_ERROR", ex);
+    }
+  }
+
+
+  @Test
+  public void testAbortUploadSuccessWithOutAnyParts() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    try {
+      String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
+          .STAND_ALONE, ReplicationFactor.ONE);
+      bucket.abortMultipartUpload(keyName, uploadID);
+    } catch (IOException ex) {
+      fail("testAbortUploadSuccess failed");
+    }
+  }
+
+  @Test
+  public void testAbortUploadSuccessWithParts() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    try {
+      String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
+          .STAND_ALONE, ReplicationFactor.ONE);
+      uploadPart(bucket, keyName, uploadID, 1, "data".getBytes("UTF-8"));
+      bucket.abortMultipartUpload(keyName, uploadID);
+    } catch (IOException ex) {
+      fail("testAbortUploadSuccess failed");
+    }
+  }
+
 
   private byte[] generateData(int size, byte val) {
     byte[] chars = new byte[size];

+ 7 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java

@@ -213,4 +213,11 @@ public interface KeyManager {
    */
   OmMultipartUploadCompleteInfo completeMultipartUpload(OmKeyArgs omKeyArgs,
       OmMultipartUploadList multipartUploadList) throws IOException;
+
+  /**
+   * Abort multipart upload request.
+   * @param omKeyArgs
+   * @throws IOException
+   */
+  void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException;
 }

+ 76 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

@@ -213,6 +213,8 @@ public class KeyManagerImpl implements KeyManager {
 
     try {
       if (args.getIsMultipartKey()) {
+        Preconditions.checkArgument(args.getMultipartUploadPartNumber() > 0,
+            "PartNumber Should be greater than zero");
         // When key is multipart upload part key, we should take replication
         // type and replication factor from original key which has done
         // initiate multipart upload. If we have not found any such, we throw
@@ -686,8 +688,16 @@ public class KeyManagerImpl implements KeyManager {
       keyInfo.setDataSize(omKeyArgs.getDataSize());
       keyInfo.updateLocationInfoList(omKeyArgs.getLocationInfoList());
 
-      partName = keyName + clientID;
+      partName = metadataManager.getOzoneKey(volumeName, bucketName, keyName)
+          + clientID;
       if (multipartKeyInfo == null) {
+        // This can occur when user started uploading part by the time commit
+        // of that part happens, in between the user might have requested
+        // abort multipart upload. If we just throw exception, then the data
+        // will not be garbage collected, so move this part to delete table
+        // and throw error
+        // Move this part to delete table.
+        metadataManager.getDeletedTable().put(partName, keyInfo);
         throw new OMException("No such Multipart upload is with specified " +
             "uploadId " + uploadID, ResultCodes.NO_SUCH_MULTIPART_UPLOAD);
       } else {
@@ -886,4 +896,69 @@ public class KeyManagerImpl implements KeyManager {
       metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
     }
   }
+
+  @Override
+  public void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException {
+
+    Preconditions.checkNotNull(omKeyArgs);
+    String volumeName = omKeyArgs.getVolumeName();
+    String bucketName = omKeyArgs.getBucketName();
+    String keyName = omKeyArgs.getKeyName();
+    String uploadID = omKeyArgs.getMultipartUploadID();
+    Preconditions.checkNotNull(uploadID, "uploadID cannot be null");
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+
+    try {
+      String multipartKey = metadataManager.getMultipartKey(volumeName,
+          bucketName, keyName, uploadID);
+      OmMultipartKeyInfo multipartKeyInfo = metadataManager
+          .getMultipartInfoTable().get(multipartKey);
+      OmKeyInfo openKeyInfo = metadataManager.getOpenKeyTable().get(
+          multipartKey);
+
+      // If there is no entry in openKeyTable, then there is no multipart
+      // upload initiated for this key.
+      if (openKeyInfo == null) {
+        LOG.error("Abort Multipart Upload Failed: volume: " + volumeName +
+            "bucket: " + bucketName + "key: " + keyName + "with error no " +
+            "such uploadID:" + uploadID);
+        throw new OMException("Abort Multipart Upload Failed: volume: " +
+            volumeName + "bucket: " + bucketName + "key: " + keyName,
+            ResultCodes.NO_SUCH_MULTIPART_UPLOAD);
+      } else {
+        // Move all the parts to delete table
+        TreeMap<Integer, PartKeyInfo> partKeyInfoMap = multipartKeyInfo
+            .getPartKeyInfoList();
+        DBStore store = metadataManager.getStore();
+        try (BatchOperation batch = store.initBatchOperation()) {
+          for (Map.Entry<Integer, PartKeyInfo> partKeyInfoEntry : partKeyInfoMap
+              .entrySet()) {
+            PartKeyInfo partKeyInfo = partKeyInfoEntry.getValue();
+            OmKeyInfo currentKeyPartInfo = OmKeyInfo.getFromProtobuf(
+                partKeyInfo.getPartKeyInfo());
+            metadataManager.getDeletedTable().putWithBatch(batch,
+                partKeyInfo.getPartName(), currentKeyPartInfo);
+          }
+          // Finally delete the entry from the multipart info table and open
+          // key table
+          metadataManager.getMultipartInfoTable().deleteWithBatch(batch,
+              multipartKey);
+          metadataManager.getOpenKeyTable().deleteWithBatch(batch,
+              multipartKey);
+          store.commitBatchOperation(batch);
+        }
+      }
+    } catch (OMException ex) {
+      throw ex;
+    } catch (IOException ex) {
+      LOG.error("Abort Multipart Upload Failed: volume: " + volumeName +
+          "bucket: " + bucketName + "key: " + keyName, ex);
+      throw new OMException(ex.getMessage(), ResultCodes
+          .COMPLETE_MULTIPART_UPLOAD_FAILED);
+    } finally {
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+    }
+
+  }
+
 }

+ 19 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java

@@ -90,6 +90,8 @@ public class OMMetrics {
   private @Metric MutableCounterLong numCommitMultipartUploadParts;
   private @Metric MutableCounterLong getNumCommitMultipartUploadPartFails;
   private @Metric MutableCounterLong numCompleteMultipartUploadFails;
+  private @Metric MutableCounterLong numAbortMultipartUploads;
+  private @Metric MutableCounterLong numAbortMultipartUploadFails;
 
   // Metrics for total number of volumes, buckets and keys
 
@@ -258,6 +260,15 @@ public class OMMetrics {
     numCompleteMultipartUploadFails.incr();
   }
 
+  public void incNumAbortMultipartUploads() {
+    numKeyOps.incr();
+    numAbortMultipartUploads.incr();
+  }
+
+  public void incNumAbortMultipartUploadFails() {
+    numAbortMultipartUploadFails.incr();
+  }
+
   public void incNumGetServiceLists() {
     numGetServiceLists.incr();
   }
@@ -576,6 +587,14 @@ public class OMMetrics {
     return numInitiateMultipartUploadFails.value();
   }
 
+  public long getNumAbortMultipartUploads() {
+    return numAbortMultipartUploads.value();
+  }
+
+  public long getNumAbortMultipartUploadFails() {
+    return numAbortMultipartUploadFails.value();
+  }
+
   public void unRegister() {
     MetricsSystem ms = DefaultMetricsSystem.instance();
     ms.unregisterSource(SOURCE_NAME);

+ 19 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -1672,6 +1672,25 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+  @Override
+  public void abortMultipartUpload(OmKeyArgs omKeyArgs) throws IOException {
+
+    Map<String, String> auditMap = (omKeyArgs == null) ? new LinkedHashMap<>() :
+        omKeyArgs.toAuditMap();
+    metrics.incNumAbortMultipartUploads();
+    try {
+      keyManager.abortMultipartUpload(omKeyArgs);
+      AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction
+          .COMPLETE_MULTIPART_UPLOAD, auditMap));
+    } catch (IOException ex) {
+      metrics.incNumAbortMultipartUploadFails();
+      AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction
+          .COMPLETE_MULTIPART_UPLOAD, auditMap, ex));
+      throw ex;
+    }
+
+  }
+
 
 
 

+ 1 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java

@@ -123,6 +123,7 @@ public class OMException extends IOException {
     MISSING_UPLOAD_PARTS,
     COMPLETE_MULTIPART_UPLOAD_FAILED,
     ENTITY_TOO_SMALL,
+    ABORT_MULTIPART_UPLOAD_FAILED,
     INVALID_REQUEST;
   }
 }

+ 34 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java

@@ -98,6 +98,10 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .LookupKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .LookupKeyResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
+    MultipartUploadAbortRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartUploadAbortResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartCommitUploadPartRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -346,6 +350,12 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
       responseBuilder.setCompleteMultiPartUploadResponse(
           completeMultipartUploadResponse);
       break;
+    case AbortMultiPartUpload:
+      MultipartUploadAbortResponse multipartUploadAbortResponse =
+          abortMultipartUpload(request.getAbortMultiPartUploadRequest());
+      responseBuilder.setAbortMultiPartUploadResponse(
+          multipartUploadAbortResponse);
+      break;
     case ServiceList:
       ServiceListResponse serviceListResponse = getServiceList(
           request.getServiceListRequest());
@@ -415,6 +425,8 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
         return Status.MISSING_UPLOAD_PARTS;
       case ENTITY_TOO_SMALL:
         return Status.ENTITY_TOO_SMALL;
+      case ABORT_MULTIPART_UPLOAD_FAILED:
+        return Status.ABORT_MULTIPART_UPLOAD_FAILED;
       default:
         return Status.INTERNAL_ERROR;
       }
@@ -913,4 +925,26 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
     }
     return response.build();
   }
+
+  private MultipartUploadAbortResponse abortMultipartUpload(
+      MultipartUploadAbortRequest multipartUploadAbortRequest) {
+    MultipartUploadAbortResponse.Builder response =
+        MultipartUploadAbortResponse.newBuilder();
+
+    try {
+      KeyArgs keyArgs = multipartUploadAbortRequest.getKeyArgs();
+      OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+          .setVolumeName(keyArgs.getVolumeName())
+          .setBucketName(keyArgs.getBucketName())
+          .setKeyName(keyArgs.getKeyName())
+          .setMultipartUploadID(keyArgs.getMultipartUploadID())
+          .build();
+      impl.abortMultipartUpload(omKeyArgs);
+      response.setStatus(Status.OK);
+    } catch (IOException ex) {
+      response.setStatus(exceptionToResponseStatus(ex));
+    }
+    return response.build();
+  }
+
 }