Procházet zdrojové kódy

HDDS-1842. Implement S3 Abort MPU request to use Cache and DoubleBuffer. (#1155)

Bharat Viswanadham před 6 roky
rodič
revize
3c4159ff3d
14 změnil soubory, kde provedl 764 přidání a 99 odebrání
  1. 1 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
  2. 3 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
  3. 3 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
  4. 173 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java
  5. 11 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3InitiateMultipartUploadResponse.java
  6. 83 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponse.java
  7. 1 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java
  8. 18 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java
  9. 37 3
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
  10. 158 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadAbortRequest.java
  11. 0 55
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartResponse.java
  12. 4 41
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartUploadResponse.java
  13. 143 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java
  14. 129 0
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadAbortResponse.java

+ 1 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java

@@ -56,6 +56,7 @@ public enum OMAction implements AuditAction {
   COMMIT_MULTIPART_UPLOAD_PARTKEY,
   COMPLETE_MULTIPART_UPLOAD,
   LIST_MULTIPART_UPLOAD_PARTS,
+  ABORT_MULTIPART_UPLOAD,
 
   //FS Actions
   GET_FILE_STATUS,

+ 3 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java

@@ -176,6 +176,9 @@ public class OzoneManagerDoubleBuffer {
     omMetadataManager.getKeyTable().cleanupCache(lastRatisTransactionIndex);
     omMetadataManager.getDeletedTable().cleanupCache(lastRatisTransactionIndex);
     omMetadataManager.getS3Table().cleanupCache(lastRatisTransactionIndex);
+    omMetadataManager.getMultipartInfoTable().cleanupCache(
+        lastRatisTransactionIndex);
+
   }
 
   /**

+ 3 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequest;
 import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketCreateRequest;
 import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketDeleteRequest;
 import org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequest;
+import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadAbortRequest;
 import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest;
@@ -111,6 +112,8 @@ public final class OzoneManagerRatisUtils {
       return new S3InitiateMultipartUploadRequest(omRequest);
     case CommitMultiPartUpload:
       return new S3MultipartUploadCommitPartRequest(omRequest);
+    case AbortMultiPartUpload:
+      return new S3MultipartUploadAbortRequest(omRequest);
     default:
       // TODO: will update once all request types are implemented.
       return null;

+ 173 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java

@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import java.io.IOException;
+
+import com.google.common.base.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.multipart
+    .S3MultipartUploadAbortResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyArgs;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartUploadAbortResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
+/**
+ * Handles Abort of multipart upload request.
+ */
+public class S3MultipartUploadAbortRequest extends OMKeyRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3MultipartUploadAbortRequest.class);
+
+  public S3MultipartUploadAbortRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    KeyArgs keyArgs =
+        getOmRequest().getAbortMultiPartUploadRequest().getKeyArgs();
+
+    return getOmRequest().toBuilder().setAbortMultiPartUploadRequest(
+        getOmRequest().getAbortMultiPartUploadRequest().toBuilder()
+            .setKeyArgs(keyArgs.toBuilder().setModificationTime(Time.now())))
+        .setUserInfo(getUserInfo()).build();
+
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long transactionLogIndex) {
+    OzoneManagerProtocolProtos.KeyArgs keyArgs =
+        getOmRequest().getAbortMultiPartUploadRequest().getKeyArgs();
+
+    String volumeName = keyArgs.getVolumeName();
+    String bucketName = keyArgs.getBucketName();
+    String keyName = keyArgs.getKeyName();
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    boolean acquiredLock = false;
+    IOException exception = null;
+    OmMultipartKeyInfo multipartKeyInfo = null;
+    String multipartKey = null;
+    try {
+      // check Acl
+      if (ozoneManager.getAclsEnabled()) {
+        checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
+            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
+            volumeName, bucketName, keyName);
+      }
+
+      acquiredLock =
+          omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
+              bucketName);
+
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+      multipartKey = omMetadataManager.getMultipartKey(volumeName,
+          bucketName, keyName, keyArgs.getMultipartUploadID());
+
+      OmKeyInfo omKeyInfo =
+          omMetadataManager.getOpenKeyTable().get(multipartKey);
+
+      // If there is no entry in openKeyTable, then there is no multipart
+      // upload initiated for this key.
+      if (omKeyInfo == null) {
+        throw new OMException("Abort Multipart Upload Failed: volume: " +
+            volumeName + "bucket: " + bucketName + "key: " + keyName,
+            OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
+      } else {
+        multipartKeyInfo = omMetadataManager
+            .getMultipartInfoTable().get(multipartKey);
+
+
+        // Update cache of openKeyTable and multipartInfo table.
+        // No need to add the cache entries to delete table, as the entries
+        // in delete table are not used by any read/write operations.
+        omMetadataManager.getOpenKeyTable().addCacheEntry(
+            new CacheKey<>(multipartKey),
+            new CacheValue<>(Optional.absent(), transactionLogIndex));
+        omMetadataManager.getMultipartInfoTable().addCacheEntry(
+            new CacheKey<>(multipartKey),
+            new CacheValue<>(Optional.absent(), transactionLogIndex));
+      }
+
+    } catch (IOException ex) {
+      exception = ex;
+    } finally {
+      if (acquiredLock) {
+        omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
+    }
+
+    // audit log
+    auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
+        OMAction.ABORT_MULTIPART_UPLOAD, buildKeyArgsAuditMap(keyArgs),
+        exception, getOmRequest().getUserInfo()));
+
+    OMResponse.Builder omResponse = OMResponse.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.AbortMultiPartUpload)
+        .setStatus(OzoneManagerProtocolProtos.Status.OK)
+        .setSuccess(true);
+
+
+    if (exception == null) {
+      LOG.debug("Abort Multipart request is successfully completed for " +
+          "KeyName {} in VolumeName/Bucket {}/{}", keyName, volumeName,
+          bucketName);
+      return new S3MultipartUploadAbortResponse(multipartKey,
+          keyArgs.getModificationTime(), multipartKeyInfo,
+          omResponse.setAbortMultiPartUploadResponse(
+              MultipartUploadAbortResponse.newBuilder()).build());
+    } else {
+      LOG.error("Abort Multipart request is failed for " +
+          "KeyName {} in VolumeName/Bucket {}/{}", keyName, volumeName,
+          bucketName, exception);
+      return new S3MultipartUploadAbortResponse(multipartKey,
+          keyArgs.getModificationTime(), multipartKeyInfo,
+          createErrorOMResponse(omResponse, exception));
+    }
+
+  }
+}

+ 11 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3InitiateMultipartUploadResponse.java

@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.ozone.om.response.s3.multipart;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
@@ -66,4 +67,14 @@ public class S3InitiateMultipartUploadResponse extends OMClientResponse {
           multipartKey, omMultipartKeyInfo);
     }
   }
+
+  @VisibleForTesting
+  public OmMultipartKeyInfo getOmMultipartKeyInfo() {
+    return omMultipartKeyInfo;
+  }
+
+  @VisibleForTesting
+  public OmKeyInfo getOmKeyInfo() {
+    return omKeyInfo;
+  }
 }

+ 83 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponse.java

@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .PartKeyInfo;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Response for Multipart Abort Request.
+ */
+public class S3MultipartUploadAbortResponse extends OMClientResponse {
+
+  private String multipartKey;
+  private long timeStamp;
+  private OmMultipartKeyInfo omMultipartKeyInfo;
+
+  public S3MultipartUploadAbortResponse(String multipartKey,
+      long timeStamp,
+      OmMultipartKeyInfo omMultipartKeyInfo,
+      OMResponse omResponse) {
+    super(omResponse);
+    this.multipartKey = multipartKey;
+    this.timeStamp = timeStamp;
+    this.omMultipartKeyInfo = omMultipartKeyInfo;
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
+
+      // Delete from openKey table and multipart info table.
+      omMetadataManager.getOpenKeyTable().deleteWithBatch(batchOperation,
+          multipartKey);
+      omMetadataManager.getMultipartInfoTable().deleteWithBatch(batchOperation,
+          multipartKey);
+
+      // Move all the parts to delete table
+      TreeMap<Integer, PartKeyInfo > partKeyInfoMap =
+          omMultipartKeyInfo.getPartKeyInfoMap();
+      for (Map.Entry<Integer, PartKeyInfo > partKeyInfoEntry :
+          partKeyInfoMap.entrySet()) {
+        PartKeyInfo partKeyInfo = partKeyInfoEntry.getValue();
+        OmKeyInfo currentKeyPartInfo =
+            OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo());
+        omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
+            OmUtils.getDeletedKeyName(partKeyInfo.getPartName(), timeStamp),
+            currentKeyPartInfo);
+      }
+
+    }
+  }
+}

+ 1 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java

@@ -69,6 +69,7 @@ public class OzoneManagerHARequestHandlerImpl
     case DeleteS3Bucket:
     case InitiateMultiPartUpload:
     case CommitMultiPartUpload:
+    case AbortMultiPartUpload:
       //TODO: We don't need to pass transactionID, this will be removed when
       // complete write requests is changed to new model. And also we can
       // return OMClientResponse, then adding to doubleBuffer can be taken

+ 18 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java

@@ -35,6 +35,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketCreateRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartUploadAbortRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartCommitUploadPartRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -348,4 +350,20 @@ public final class TestOMRequestUtils {
         .build();
   }
 
+  public static OMRequest createAbortMPURequest(String volumeName,
+      String bucketName, String keyName, String multipartUploadID) {
+    KeyArgs.Builder keyArgs =
+        KeyArgs.newBuilder().setVolumeName(volumeName)
+            .setKeyName(keyName)
+            .setBucketName(bucketName)
+            .setMultipartUploadID(multipartUploadID);
+
+    MultipartUploadAbortRequest multipartUploadAbortRequest =
+        MultipartUploadAbortRequest.newBuilder().setKeyArgs(keyArgs).build();
+
+    return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString())
+        .setCmdType(OzoneManagerProtocolProtos.Type.AbortMultiPartUpload)
+        .setAbortMultiPartUploadRequest(multipartUploadAbortRequest).build();
+  }
+
 }

+ 37 - 3
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java

@@ -19,9 +19,8 @@
 
 package org.apache.hadoop.ozone.om.request.s3.multipart;
 
-import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .OMRequest;
+import java.io.IOException;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -37,6 +36,9 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.when;
@@ -140,5 +142,37 @@ public class TestS3MultipartRequest {
     return modifiedRequest;
   }
 
+  /**
+   * Perform preExecute of Abort Multipart Upload request for given volume,
+   * bucket and keyName.
+   * @param volumeName
+   * @param bucketName
+   * @param keyName
+   * @param multipartUploadID
+   * @return OMRequest - returned from preExecute.
+   * @throws IOException
+   */
+  protected OMRequest doPreExecuteAbortMPU(
+      String volumeName, String bucketName, String keyName,
+      String multipartUploadID) throws IOException {
+
+    OMRequest omRequest =
+        TestOMRequestUtils.createAbortMPURequest(volumeName, bucketName,
+            keyName, multipartUploadID);
+
+
+    S3MultipartUploadAbortRequest s3MultipartUploadAbortRequest =
+        new S3MultipartUploadAbortRequest(omRequest);
+
+    OMRequest modifiedRequest =
+        s3MultipartUploadAbortRequest.preExecute(ozoneManager);
+
+    // UserInfo and modification time is set.
+    Assert.assertNotEquals(omRequest, modifiedRequest);
+
+    return modifiedRequest;
+
+  }
+
 
 }

+ 158 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadAbortRequest.java

@@ -0,0 +1,158 @@
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+
+/**
+ * Test Multipart upload abort request.
+ */
+public class TestS3MultipartUploadAbortRequest extends TestS3MultipartRequest {
+
+
+  @Test
+  public void testPreExecute() throws IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    doPreExecuteAbortMPU(volumeName, bucketName, keyName,
+        UUID.randomUUID().toString());
+  }
+
+  @Test
+  public void testValidateAndUpdateCache() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+
+    OMRequest initiateMPURequest = doPreExecuteInitiateMPU(volumeName,
+        bucketName, keyName);
+
+    S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
+        new S3InitiateMultipartUploadRequest(initiateMPURequest);
+
+    OMClientResponse omClientResponse =
+        s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager,
+            1L);
+
+    String multipartUploadID = omClientResponse.getOMResponse()
+        .getInitiateMultiPartUploadResponse().getMultipartUploadID();
+
+    OMRequest abortMPURequest =
+        doPreExecuteAbortMPU(volumeName, bucketName, keyName,
+            multipartUploadID);
+
+    S3MultipartUploadAbortRequest s3MultipartUploadAbortRequest =
+        new S3MultipartUploadAbortRequest(abortMPURequest);
+
+    omClientResponse =
+        s3MultipartUploadAbortRequest.validateAndUpdateCache(ozoneManager, 2L);
+
+
+    String multipartKey = omMetadataManager.getMultipartKey(volumeName,
+        bucketName, keyName, multipartUploadID);
+
+    // Check table and response.
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omClientResponse.getOMResponse().getStatus());
+    Assert.assertNull(
+        omMetadataManager.getMultipartInfoTable().get(multipartKey));
+    Assert.assertNull(omMetadataManager.getOpenKeyTable().get(multipartKey));
+
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheMultipartNotFound() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+
+    String multipartUploadID = "randomMPU";
+
+    OMRequest abortMPURequest =
+        doPreExecuteAbortMPU(volumeName, bucketName, keyName,
+            multipartUploadID);
+
+    S3MultipartUploadAbortRequest s3MultipartUploadAbortRequest =
+        new S3MultipartUploadAbortRequest(abortMPURequest);
+
+    OMClientResponse omClientResponse =
+        s3MultipartUploadAbortRequest.validateAndUpdateCache(ozoneManager, 2L);
+
+    // Check table and response.
+    Assert.assertEquals(
+        OzoneManagerProtocolProtos.Status.NO_SUCH_MULTIPART_UPLOAD_ERROR,
+        omClientResponse.getOMResponse().getStatus());
+
+  }
+
+
+  @Test
+  public void testValidateAndUpdateCacheVolumeNotFound() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+
+    String multipartUploadID = "randomMPU";
+
+    OMRequest abortMPURequest =
+        doPreExecuteAbortMPU(volumeName, bucketName, keyName,
+            multipartUploadID);
+
+    S3MultipartUploadAbortRequest s3MultipartUploadAbortRequest =
+        new S3MultipartUploadAbortRequest(abortMPURequest);
+
+    OMClientResponse omClientResponse =
+        s3MultipartUploadAbortRequest.validateAndUpdateCache(ozoneManager, 2L);
+
+    // Check table and response.
+    Assert.assertEquals(
+        OzoneManagerProtocolProtos.Status.VOLUME_NOT_FOUND,
+        omClientResponse.getOMResponse().getStatus());
+
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheBucketNotFound() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+
+    TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager);
+
+    String multipartUploadID = "randomMPU";
+
+    OMRequest abortMPURequest =
+        doPreExecuteAbortMPU(volumeName, bucketName, keyName,
+            multipartUploadID);
+
+    S3MultipartUploadAbortRequest s3MultipartUploadAbortRequest =
+        new S3MultipartUploadAbortRequest(abortMPURequest);
+
+    OMClientResponse omClientResponse =
+        s3MultipartUploadAbortRequest.validateAndUpdateCache(ozoneManager, 2L);
+
+    // Check table and response.
+    Assert.assertEquals(
+        OzoneManagerProtocolProtos.Status.BUCKET_NOT_FOUND,
+        omClientResponse.getOMResponse().getStatus());
+
+  }
+}

+ 0 - 55
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartResponse.java

@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.ozone.om.response.s3.multipart;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
-import org.apache.hadoop.utils.db.BatchOperation;
-
-/**
- * Base test class for S3 MPU response.
- */
-
-@SuppressWarnings("VisibilityModifier")
-public class TestS3InitiateMultipartResponse {
-
-  @Rule
-  public TemporaryFolder folder = new TemporaryFolder();
-
-  protected OMMetadataManager omMetadataManager;
-  protected BatchOperation batchOperation;
-
-  @Before
-  public void setup() throws Exception {
-    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
-    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
-        folder.newFolder().getAbsolutePath());
-    omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
-    batchOperation = omMetadataManager.getStore().initBatchOperation();
-  }
-
-
-}

+ 4 - 41
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3InitiateMultipartUploadResponse.java

@@ -19,28 +19,16 @@
 
 package org.apache.hadoop.ozone.om.response.s3.multipart;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.UUID;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartInfoInitiateResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
-import org.apache.hadoop.util.Time;
-
 /**
  * Class tests S3 Initiate MPU response.
  */
 public class TestS3InitiateMultipartUploadResponse
-    extends TestS3InitiateMultipartResponse {
+    extends TestS3MultipartResponse {
 
   @Test
   public void addDBToBatch() throws Exception {
@@ -49,35 +37,10 @@ public class TestS3InitiateMultipartUploadResponse
     String keyName = UUID.randomUUID().toString();
     String multipartUploadID = UUID.randomUUID().toString();
 
-
-    OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo(
-        multipartUploadID, new HashMap<>());
-
-    OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
-        .setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setKeyName(keyName)
-        .setCreationTime(Time.now())
-        .setModificationTime(Time.now())
-        .setReplicationType(HddsProtos.ReplicationType.RATIS)
-        .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
-        .setOmKeyLocationInfos(Collections.singletonList(
-            new OmKeyLocationInfoGroup(0, new ArrayList<>())))
-        .build();
-
-    OMResponse omResponse = OMResponse.newBuilder()
-        .setCmdType(OzoneManagerProtocolProtos.Type.InitiateMultiPartUpload)
-        .setStatus(OzoneManagerProtocolProtos.Status.OK)
-        .setSuccess(true).setInitiateMultiPartUploadResponse(
-            MultipartInfoInitiateResponse.newBuilder()
-            .setVolumeName(volumeName)
-            .setBucketName(bucketName)
-            .setKeyName(keyName)
-            .setMultipartUploadID(multipartUploadID)).build();
-
     S3InitiateMultipartUploadResponse s3InitiateMultipartUploadResponse =
-        new S3InitiateMultipartUploadResponse(multipartKeyInfo, omKeyInfo,
-            omResponse);
+        createS3InitiateMPUResponse(volumeName, bucketName, keyName,
+            multipartUploadID);
+
 
     s3InitiateMultipartUploadResponse.addToDBBatch(omMetadataManager,
         batchOperation);

+ 143 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java

@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.UUID;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .KeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartUploadAbortResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .PartKeyInfo;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+/**
+ * Base test class for S3 MPU response.
+ */
+
+@SuppressWarnings("VisibilityModifier")
+public class TestS3MultipartResponse {
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  protected OMMetadataManager omMetadataManager;
+  protected BatchOperation batchOperation;
+
+  @Before
+  public void setup() throws Exception {
+    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        folder.newFolder().getAbsolutePath());
+    omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
+    batchOperation = omMetadataManager.getStore().initBatchOperation();
+  }
+
+
+  public S3InitiateMultipartUploadResponse createS3InitiateMPUResponse(
+      String volumeName, String bucketName, String keyName,
+      String multipartUploadID) {
+    OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo(
+        multipartUploadID, new HashMap<>());
+
+    OmKeyInfo omKeyInfo = new OmKeyInfo.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setCreationTime(Time.now())
+        .setModificationTime(Time.now())
+        .setReplicationType(HddsProtos.ReplicationType.RATIS)
+        .setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
+        .setOmKeyLocationInfos(Collections.singletonList(
+            new OmKeyLocationInfoGroup(0, new ArrayList<>())))
+        .build();
+
+    OMResponse omResponse = OMResponse.newBuilder()
+            .setCmdType(OzoneManagerProtocolProtos.Type.InitiateMultiPartUpload)
+            .setStatus(OzoneManagerProtocolProtos.Status.OK)
+            .setSuccess(true).setInitiateMultiPartUploadResponse(
+            OzoneManagerProtocolProtos.MultipartInfoInitiateResponse
+                .newBuilder().setVolumeName(volumeName)
+                .setBucketName(bucketName)
+                .setKeyName(keyName)
+                .setMultipartUploadID(multipartUploadID)).build();
+
+    return new S3InitiateMultipartUploadResponse(multipartKeyInfo, omKeyInfo,
+            omResponse);
+  }
+
+  public S3MultipartUploadAbortResponse createS3AbortMPUResponse(
+      String multipartKey, long timeStamp,
+      OmMultipartKeyInfo omMultipartKeyInfo) {
+    OMResponse omResponse = OMResponse.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.AbortMultiPartUpload)
+        .setStatus(OzoneManagerProtocolProtos.Status.OK)
+        .setSuccess(true)
+        .setAbortMultiPartUploadResponse(
+            MultipartUploadAbortResponse.newBuilder().build()).build();
+
+    return new S3MultipartUploadAbortResponse(multipartKey, Time.now(),
+            omMultipartKeyInfo,
+            omResponse);
+  }
+
+
+  public void addPart(int partNumber, PartKeyInfo partKeyInfo,
+      OmMultipartKeyInfo omMultipartKeyInfo) {
+    omMultipartKeyInfo.addPartKeyInfo(partNumber, partKeyInfo);
+  }
+
+  public PartKeyInfo createPartKeyInfo(
+      String volumeName, String bucketName, String keyName, int partNumber) {
+    return PartKeyInfo.newBuilder()
+        .setPartNumber(partNumber)
+        .setPartName(omMetadataManager.getMultipartKey(volumeName,
+            bucketName, keyName, UUID.randomUUID().toString()))
+        .setPartKeyInfo(KeyInfo.newBuilder()
+            .setVolumeName(volumeName)
+            .setBucketName(bucketName)
+            .setKeyName(keyName)
+            .setDataSize(100L) // Just set dummy size for testing
+            .setCreationTime(Time.now())
+            .setModificationTime(Time.now())
+            .setType(HddsProtos.ReplicationType.RATIS)
+            .setFactor(HddsProtos.ReplicationFactor.ONE).build()).build();
+  }
+}

+ 129 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadAbortResponse.java

@@ -0,0 +1,129 @@
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import java.util.UUID;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .PartKeyInfo;
+import org.apache.hadoop.util.Time;
+
+/**
+ * Test multipart upload abort response.
+ */
+public class TestS3MultipartUploadAbortResponse
+    extends TestS3MultipartResponse {
+
+
+  @Test
+  public void testAddDBToBatch() throws Exception {
+
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+    String multipartUploadID = UUID.randomUUID().toString();
+    String multipartKey = omMetadataManager.getMultipartKey(volumeName,
+        bucketName, keyName, multipartUploadID);
+
+    S3InitiateMultipartUploadResponse s3InitiateMultipartUploadResponse =
+        createS3InitiateMPUResponse(volumeName, bucketName, keyName,
+            multipartUploadID);
+
+    s3InitiateMultipartUploadResponse.addToDBBatch(omMetadataManager,
+        batchOperation);
+
+    S3MultipartUploadAbortResponse s3MultipartUploadAbortResponse =
+        createS3AbortMPUResponse(multipartKey, Time.now(),
+            s3InitiateMultipartUploadResponse.getOmMultipartKeyInfo());
+
+    s3MultipartUploadAbortResponse.addToDBBatch(omMetadataManager,
+        batchOperation);
+
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+
+    Assert.assertNull(omMetadataManager.getOpenKeyTable().get(multipartKey));
+    Assert.assertNull(
+        omMetadataManager.getMultipartInfoTable().get(multipartKey));
+
+    // As no parts are created, so no entries should be there in delete table.
+    Assert.assertTrue(omMetadataManager.countRowsInTable(
+        omMetadataManager.getDeletedTable()) == 0);
+  }
+
+  @Test
+  public void testAddDBToBatchWithParts() throws Exception {
+
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+    String multipartUploadID = UUID.randomUUID().toString();
+    String multipartKey = omMetadataManager.getMultipartKey(volumeName,
+        bucketName, keyName, multipartUploadID);
+
+    S3InitiateMultipartUploadResponse s3InitiateMultipartUploadResponse =
+        createS3InitiateMPUResponse(volumeName, bucketName, keyName,
+            multipartUploadID);
+
+    s3InitiateMultipartUploadResponse.addToDBBatch(omMetadataManager,
+        batchOperation);
+
+
+    // Add some dummy parts for testing.
+    // Not added any key locations, as this just test is to see entries are
+    // adding to delete table or not.
+
+    OmMultipartKeyInfo omMultipartKeyInfo =
+        s3InitiateMultipartUploadResponse.getOmMultipartKeyInfo();
+
+    PartKeyInfo part1 = createPartKeyInfo(volumeName, bucketName,
+        keyName, 1);
+    PartKeyInfo part2 = createPartKeyInfo(volumeName, bucketName,
+        keyName, 1);
+
+    addPart(1, part1, omMultipartKeyInfo);
+    addPart(2, part2, omMultipartKeyInfo);
+
+
+    long timeStamp = Time.now();
+    S3MultipartUploadAbortResponse s3MultipartUploadAbortResponse =
+        createS3AbortMPUResponse(multipartKey, timeStamp,
+            s3InitiateMultipartUploadResponse.getOmMultipartKeyInfo());
+
+    s3MultipartUploadAbortResponse.addToDBBatch(omMetadataManager,
+        batchOperation);
+
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+
+    Assert.assertNull(omMetadataManager.getOpenKeyTable().get(multipartKey));
+    Assert.assertNull(
+        omMetadataManager.getMultipartInfoTable().get(multipartKey));
+
+    // As 2 parts are created, so 2 entries should be there in delete table.
+    Assert.assertTrue(omMetadataManager.countRowsInTable(
+        omMetadataManager.getDeletedTable()) == 2);
+
+    String part1DeletedKeyName = OmUtils.getDeletedKeyName(
+        omMultipartKeyInfo.getPartKeyInfo(1).getPartName(),
+        timeStamp);
+
+    String part2DeletedKeyName = OmUtils.getDeletedKeyName(
+        omMultipartKeyInfo.getPartKeyInfo(2).getPartName(),
+        timeStamp);
+
+    Assert.assertNotNull(omMetadataManager.getDeletedTable().get(
+        part1DeletedKeyName));
+    Assert.assertNotNull(omMetadataManager.getDeletedTable().get(
+        part2DeletedKeyName));
+
+    Assert.assertEquals(OmKeyInfo.getFromProtobuf(part1.getPartKeyInfo()),
+        omMetadataManager.getDeletedTable().get(part1DeletedKeyName));
+
+    Assert.assertEquals(OmKeyInfo.getFromProtobuf(part2.getPartKeyInfo()),
+        omMetadataManager.getDeletedTable().get(part2DeletedKeyName));
+  }
+
+}