Forráskód Böngészése

HDDS-1819. Implement S3 Commit MPU request to use Cache and DoubleBuffer. (#1140)

Bharat Viswanadham 5 éve
szülő
commit
1d98a212cb

+ 3 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.om.request.key.OMKeyRenameRequest;
 import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketCreateRequest;
 import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketDeleteRequest;
 import org.apache.hadoop.ozone.om.request.s3.multipart.S3InitiateMultipartUploadRequest;
+import org.apache.hadoop.ozone.om.request.s3.multipart.S3MultipartUploadCommitPartRequest;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeCreateRequest;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeDeleteRequest;
 import org.apache.hadoop.ozone.om.request.volume.OMVolumeSetOwnerRequest;
@@ -108,6 +109,8 @@ public final class OzoneManagerRatisUtils {
       return new S3BucketDeleteRequest(omRequest);
     case InitiateMultiPartUpload:
       return new S3InitiateMultipartUploadRequest(omRequest);
+    case CommitMultiPartUpload:
+      return new S3MultipartUploadCommitPartRequest(omRequest);
     default:
       // TODO: will update once all request types are implemented.
       return null;

+ 217 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java

@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.multipart
+    .S3MultipartUploadCommitPartResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartCommitUploadPartRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartCommitUploadPartResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.db.cache.CacheKey;
+import org.apache.hadoop.utils.db.cache.CacheValue;
+
+import java.io.IOException;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+
+/**
+ * Handle Multipart upload commit upload part file.
+ */
+public class S3MultipartUploadCommitPartRequest extends OMKeyRequest {
+
+  public S3MultipartUploadCommitPartRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) {
+    MultipartCommitUploadPartRequest multipartCommitUploadPartRequest =
+        getOmRequest().getCommitMultiPartUploadRequest();
+
+    return getOmRequest().toBuilder().setCommitMultiPartUploadRequest(
+        multipartCommitUploadPartRequest.toBuilder()
+            .setKeyArgs(multipartCommitUploadPartRequest.getKeyArgs()
+                .toBuilder().setModificationTime(Time.now())))
+        .setUserInfo(getUserInfo()).build();
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long transactionLogIndex) {
+    MultipartCommitUploadPartRequest multipartCommitUploadPartRequest =
+        getOmRequest().getCommitMultiPartUploadRequest();
+
+    OzoneManagerProtocolProtos.KeyArgs keyArgs =
+        multipartCommitUploadPartRequest.getKeyArgs();
+
+    String volumeName = keyArgs.getVolumeName();
+    String bucketName = keyArgs.getBucketName();
+    String keyName = keyArgs.getKeyName();
+
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    ozoneManager.getMetrics().incNumCommitMultipartUploadParts();
+
+    boolean acquiredLock = false;
+    OmMultipartKeyInfo multipartKeyInfo = null;
+    OmKeyInfo omKeyInfo = null;
+    String openKey = null;
+    String multipartKey = null;
+    OzoneManagerProtocolProtos.PartKeyInfo oldPartKeyInfo = null;
+    IOException exception = null;
+    String partName = null;
+    try {
+      // check Acl
+      if (ozoneManager.getAclsEnabled()) {
+        checkAcls(ozoneManager, OzoneObj.ResourceType.KEY,
+            OzoneObj.StoreType.OZONE, IAccessAuthorizer.ACLType.WRITE,
+            volumeName, bucketName, keyName);
+      }
+
+      acquiredLock =
+          omMetadataManager.getLock().acquireLock(BUCKET_LOCK, volumeName,
+              bucketName);
+
+      validateBucketAndVolume(omMetadataManager, volumeName, bucketName);
+
+      String uploadID = keyArgs.getMultipartUploadID();
+      multipartKey = omMetadataManager.getMultipartKey(volumeName, bucketName,
+          keyName, uploadID);
+
+      multipartKeyInfo = omMetadataManager
+          .getMultipartInfoTable().get(multipartKey);
+
+      long clientID = multipartCommitUploadPartRequest.getClientID();
+
+      openKey = omMetadataManager.getOpenKey(
+          volumeName, bucketName, keyName, clientID);
+
+      omKeyInfo = omMetadataManager.getOpenKeyTable().get(openKey);
+
+
+      if (omKeyInfo == null) {
+        throw new OMException("Failed to commit Multipart Upload key, as " +
+            openKey + "entry is not found in the openKey table", KEY_NOT_FOUND);
+      }
+
+      // set the data size and location info list
+      omKeyInfo.setDataSize(keyArgs.getDataSize());
+      omKeyInfo.updateLocationInfoList(keyArgs.getKeyLocationsList().stream()
+          .map(OmKeyLocationInfo::getFromProtobuf)
+          .collect(Collectors.toList()));
+      // Set Modification time
+      omKeyInfo.setModificationTime(keyArgs.getModificationTime());
+
+      partName = omMetadataManager.getOzoneKey(volumeName, bucketName,
+          keyName) + clientID;
+
+      if (multipartKeyInfo == null) {
+        // This can occur when user started uploading part by the time commit
+        // of that part happens, in between the user might have requested
+        // abort multipart upload. If we just throw exception, then the data
+        // will not be garbage collected, so move this part to delete table
+        // and throw error
+        // Move this part to delete table.
+        throw new OMException("No such Multipart upload is with specified " +
+            "uploadId " + uploadID,
+            OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR);
+      } else {
+        int partNumber = keyArgs.getMultipartNumber();
+        oldPartKeyInfo = multipartKeyInfo.getPartKeyInfo(partNumber);
+
+        // Build this multipart upload part info.
+        OzoneManagerProtocolProtos.PartKeyInfo.Builder partKeyInfo =
+            OzoneManagerProtocolProtos.PartKeyInfo.newBuilder();
+        partKeyInfo.setPartName(partName);
+        partKeyInfo.setPartNumber(partNumber);
+        partKeyInfo.setPartKeyInfo(omKeyInfo.getProtobuf());
+
+        // Add this part information in to multipartKeyInfo.
+        multipartKeyInfo.addPartKeyInfo(partNumber, partKeyInfo.build());
+
+        // Add to cache.
+
+        // Delete from open key table and add it to multipart info table.
+        // No need to add cache entries to delete table, as no
+        // read/write requests that info for validation.
+        omMetadataManager.getMultipartInfoTable().addCacheEntry(
+            new CacheKey<>(multipartKey),
+            new CacheValue<>(Optional.of(multipartKeyInfo),
+                transactionLogIndex));
+
+        omMetadataManager.getOpenKeyTable().addCacheEntry(
+            new CacheKey<>(openKey),
+            new CacheValue<>(Optional.absent(), transactionLogIndex));
+      }
+
+    } catch (IOException ex) {
+      exception = ex;
+    } finally {
+      if (acquiredLock) {
+        omMetadataManager.getLock().releaseLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
+    }
+
+    // audit log
+    auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
+        OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, buildKeyArgsAuditMap(keyArgs),
+        exception, getOmRequest().getUserInfo()));
+
+    OMResponse.Builder omResponse = OMResponse.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.CommitMultiPartUpload)
+        .setStatus(OzoneManagerProtocolProtos.Status.OK)
+        .setSuccess(true);
+
+    if (exception == null) {
+      omResponse.setCommitMultiPartUploadResponse(
+          MultipartCommitUploadPartResponse.newBuilder().setPartName(partName));
+      return new S3MultipartUploadCommitPartResponse(multipartKey, openKey,
+          keyArgs.getModificationTime(), omKeyInfo, multipartKeyInfo,
+          oldPartKeyInfo, omResponse.build());
+    } else {
+      ozoneManager.getMetrics().incNumCommitMultipartUploadPartFails();
+      return new S3MultipartUploadCommitPartResponse(multipartKey, openKey,
+          keyArgs.getModificationTime(), omKeyInfo, multipartKeyInfo,
+          oldPartKeyInfo, createErrorOMResponse(omResponse, exception));
+
+    }
+  }
+}
+

+ 109 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java

@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.multipart;
+
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.utils.db.BatchOperation;
+
+import java.io.IOException;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .Status.NO_SUCH_MULTIPART_UPLOAD_ERROR;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .Status.OK;
+
+/**
+ * Response for S3MultipartUploadCommitPart request.
+ */
+public class S3MultipartUploadCommitPartResponse extends OMClientResponse {
+
+  private String multipartKey;
+  private String openKey;
+  private long deleteTimeStamp;
+  private OmKeyInfo deletePartKeyInfo;
+  private OmMultipartKeyInfo omMultipartKeyInfo;
+  private OzoneManagerProtocolProtos.PartKeyInfo oldMultipartKeyInfo;
+
+
+  public S3MultipartUploadCommitPartResponse(String multipartKey,
+      String openKey, long deleteTimeStamp,
+      OmKeyInfo deletePartKeyInfo, OmMultipartKeyInfo omMultipartKeyInfo,
+      OzoneManagerProtocolProtos.PartKeyInfo oldPartKeyInfo,
+      OMResponse omResponse) {
+    super(omResponse);
+    this.multipartKey = multipartKey;
+    this.openKey = openKey;
+    this.deleteTimeStamp = deleteTimeStamp;
+    this.deletePartKeyInfo = deletePartKeyInfo;
+    this.omMultipartKeyInfo = omMultipartKeyInfo;
+    this.oldMultipartKeyInfo = oldPartKeyInfo;
+  }
+
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+
+    if (getOMResponse().getStatus() == NO_SUCH_MULTIPART_UPLOAD_ERROR) {
+      // Means by the time we try to commit part, some one has aborted this
+      // multipart upload. So, delete this part information.
+      omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
+          OmUtils.getDeletedKeyName(openKey, deleteTimeStamp),
+          deletePartKeyInfo);
+    }
+
+    if (getOMResponse().getStatus() == OK) {
+
+      // If we have old part info:
+      // Need to do 3 steps:
+      //   1. add old part to delete table
+      //   2. Commit multipart info which has information about this new part.
+      //   3. delete this new part entry from open key table.
+
+      // This means for this multipart upload part upload, we have an old
+      // part information, so delete it.
+      if (oldMultipartKeyInfo != null) {
+        omMetadataManager.getDeletedTable().putWithBatch(batchOperation,
+            OmUtils.getDeletedKeyName(oldMultipartKeyInfo.getPartName(),
+                deleteTimeStamp),
+            OmKeyInfo.getFromProtobuf(oldMultipartKeyInfo.getPartKeyInfo()));
+      }
+
+
+      omMetadataManager.getMultipartInfoTable().putWithBatch(batchOperation,
+          multipartKey, omMultipartKeyInfo);
+
+      //  This information has been added to multipartKeyInfo. So, we can
+      //  safely delete part key info from open key table.
+      omMetadataManager.getOpenKeyTable().deleteWithBatch(batchOperation,
+          openKey);
+
+
+    }
+  }
+
+}
+

+ 7 - 8
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerHARequestHandlerImpl.java

@@ -26,8 +26,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .Status;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .Type;
 
@@ -70,6 +68,7 @@ public class OzoneManagerHARequestHandlerImpl
     case CreateS3Bucket:
     case DeleteS3Bucket:
     case InitiateMultiPartUpload:
+    case CommitMultiPartUpload:
       //TODO: We don't need to pass transactionID, this will be removed when
       // complete write requests is changed to new model. And also we can
       // return OMClientResponse, then adding to doubleBuffer can be taken
@@ -81,12 +80,12 @@ public class OzoneManagerHARequestHandlerImpl
           omClientRequest.validateAndUpdateCache(getOzoneManager(),
               transactionLogIndex);
 
-      // If any error we have got when validateAndUpdateCache, OMResponse
-      // Status is set with Error Code other than OK, in that case don't
-      // add this to double buffer.
-      if (omClientResponse.getOMResponse().getStatus() == Status.OK) {
-        ozoneManagerDoubleBuffer.add(omClientResponse, transactionLogIndex);
-      }
+
+      // Add OMClient Response to double buffer.
+      // Each OMClient Response should handle what needs to be done in error
+      // case.
+      ozoneManagerDoubleBuffer.add(omClientResponse, transactionLogIndex);
+
       return omClientResponse.getOMResponse();
     default:
       // As all request types are not changed so we need to call handle

+ 32 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/TestOMRequestUtils.java

@@ -35,6 +35,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.request.s3.bucket.S3BucketCreateRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartCommitUploadPartRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -316,4 +318,34 @@ public final class TestOMRequestUtils {
         .build();
   }
 
+  /**
+   * Create OMRequest which encapsulates InitiateMultipartUpload request.
+   * @param volumeName
+   * @param bucketName
+   * @param keyName
+   */
+  public static OMRequest createCommitPartMPURequest(String volumeName,
+      String bucketName, String keyName, long clientID, long size,
+      String multipartUploadID, int partNumber) {
+
+    // Just set dummy size.
+    KeyArgs.Builder keyArgs =
+        KeyArgs.newBuilder().setVolumeName(volumeName).setKeyName(keyName)
+            .setBucketName(bucketName)
+            .setDataSize(size)
+            .setMultipartNumber(partNumber)
+            .setMultipartUploadID(multipartUploadID)
+            .addAllKeyLocations(new ArrayList<>());
+    // Just adding dummy list. As this is for UT only.
+
+    MultipartCommitUploadPartRequest multipartCommitUploadPartRequest =
+        MultipartCommitUploadPartRequest.newBuilder()
+            .setKeyArgs(keyArgs).setClientID(clientID).build();
+
+    return OMRequest.newBuilder().setClientId(UUID.randomUUID().toString())
+        .setCmdType(OzoneManagerProtocolProtos.Type.CommitMultiPartUpload)
+        .setCommitMultiPartUploadRequest(multipartCommitUploadPartRequest)
+        .build();
+  }
+
 }

+ 8 - 29
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3InitiateMultipartUploadRequest.java

@@ -37,8 +37,8 @@ public class TestS3InitiateMultipartUploadRequest
 
   @Test
   public void testPreExecute() {
-    doPreExecute(UUID.randomUUID().toString(), UUID.randomUUID().toString(),
-        UUID.randomUUID().toString());
+    doPreExecuteInitiateMPU(UUID.randomUUID().toString(),
+        UUID.randomUUID().toString(), UUID.randomUUID().toString());
   }
 
 
@@ -52,7 +52,8 @@ public class TestS3InitiateMultipartUploadRequest
     TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
         omMetadataManager);
 
-    OMRequest modifiedRequest = doPreExecute(volumeName, bucketName, keyName);
+    OMRequest modifiedRequest = doPreExecuteInitiateMPU(volumeName,
+        bucketName, keyName);
 
     S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
         new S3InitiateMultipartUploadRequest(modifiedRequest);
@@ -97,7 +98,8 @@ public class TestS3InitiateMultipartUploadRequest
 
     TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager);
 
-    OMRequest modifiedRequest = doPreExecute(volumeName, bucketName, keyName);
+    OMRequest modifiedRequest = doPreExecuteInitiateMPU(
+        volumeName, bucketName, keyName);
 
     S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
         new S3InitiateMultipartUploadRequest(modifiedRequest);
@@ -126,7 +128,8 @@ public class TestS3InitiateMultipartUploadRequest
     String keyName = UUID.randomUUID().toString();
 
 
-    OMRequest modifiedRequest = doPreExecute(volumeName, bucketName, keyName);
+    OMRequest modifiedRequest = doPreExecuteInitiateMPU(volumeName, bucketName,
+        keyName);
 
     S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
         new S3InitiateMultipartUploadRequest(modifiedRequest);
@@ -147,28 +150,4 @@ public class TestS3InitiateMultipartUploadRequest
         .get(multipartKey));
 
   }
-
-
-
-  private OMRequest doPreExecute(String volumeName, String bucketName,
-      String keyName) {
-    OMRequest omRequest =
-        TestOMRequestUtils.createInitiateMPURequest(volumeName, bucketName,
-            keyName);
-
-    S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
-        new S3InitiateMultipartUploadRequest(omRequest);
-
-    OMRequest modifiedRequest =
-        s3InitiateMultipartUploadRequest.preExecute(ozoneManager);
-
-    Assert.assertNotEquals(omRequest, modifiedRequest);
-    Assert.assertTrue(modifiedRequest.hasInitiateMultiPartUploadRequest());
-    Assert.assertNotNull(modifiedRequest.getInitiateMultiPartUploadRequest()
-        .getKeyArgs().getMultipartUploadID());
-    Assert.assertTrue(modifiedRequest.getInitiateMultiPartUploadRequest()
-        .getKeyArgs().getModificationTime() > 0);
-
-    return modifiedRequest;
-  }
 }

+ 68 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java

@@ -19,7 +19,11 @@
 
 package org.apache.hadoop.ozone.om.request.s3.multipart;
 
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
@@ -73,4 +77,68 @@ public class TestS3MultipartRequest {
     Mockito.framework().clearInlineMocks();
   }
 
+  /**
+   * Perform preExecute of Initiate Multipart upload request for given
+   * volume, bucket and key name.
+   * @param volumeName
+   * @param bucketName
+   * @param keyName
+   * @return OMRequest - returned from preExecute.
+   */
+  protected OMRequest doPreExecuteInitiateMPU(
+      String volumeName, String bucketName, String keyName) {
+    OMRequest omRequest =
+        TestOMRequestUtils.createInitiateMPURequest(volumeName, bucketName,
+            keyName);
+
+    S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
+        new S3InitiateMultipartUploadRequest(omRequest);
+
+    OMRequest modifiedRequest =
+        s3InitiateMultipartUploadRequest.preExecute(ozoneManager);
+
+    Assert.assertNotEquals(omRequest, modifiedRequest);
+    Assert.assertTrue(modifiedRequest.hasInitiateMultiPartUploadRequest());
+    Assert.assertNotNull(modifiedRequest.getInitiateMultiPartUploadRequest()
+        .getKeyArgs().getMultipartUploadID());
+    Assert.assertTrue(modifiedRequest.getInitiateMultiPartUploadRequest()
+        .getKeyArgs().getModificationTime() > 0);
+
+    return modifiedRequest;
+  }
+
+  /**
+   * Perform preExecute of Commit Multipart Upload request for given volume,
+   * bucket and keyName.
+   * @param volumeName
+   * @param bucketName
+   * @param keyName
+   * @param clientID
+   * @param multipartUploadID
+   * @param partNumber
+   * @return OMRequest - returned from preExecute.
+   */
+  protected OMRequest doPreExecuteCommitMPU(
+      String volumeName, String bucketName, String keyName,
+      long clientID, String multipartUploadID, int partNumber) {
+
+    // Just set dummy size
+    long dataSize = 100L;
+    OMRequest omRequest =
+        TestOMRequestUtils.createCommitPartMPURequest(volumeName, bucketName,
+            keyName, clientID, dataSize, multipartUploadID, partNumber);
+    S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+        new S3MultipartUploadCommitPartRequest(omRequest);
+
+
+    OMRequest modifiedRequest =
+        s3MultipartUploadCommitPartRequest.preExecute(ozoneManager);
+
+    // UserInfo and modification time is set.
+    Assert.assertNotEquals(omRequest, modifiedRequest);
+
+    return modifiedRequest;
+  }
+
+
 }

+ 209 - 0
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartUploadCommitPartRequest.java

@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.multipart;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.om.request.TestOMRequestUtils;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+
+/**
+ * Tests S3 Multipart upload commit part request.
+ */
+public class TestS3MultipartUploadCommitPartRequest
+    extends TestS3MultipartRequest {
+
+  @Test
+  public void testPreExecute() {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    doPreExecuteCommitMPU(volumeName, bucketName, keyName, Time.now(),
+        UUID.randomUUID().toString(), 1);
+  }
+
+
+  @Test
+  public void testValidateAndUpdateCacheSuccess() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+
+    OMRequest initiateMPURequest = doPreExecuteInitiateMPU(volumeName,
+        bucketName, keyName);
+
+    S3InitiateMultipartUploadRequest s3InitiateMultipartUploadRequest =
+        new S3InitiateMultipartUploadRequest(initiateMPURequest);
+
+    OMClientResponse omClientResponse =
+        s3InitiateMultipartUploadRequest.validateAndUpdateCache(ozoneManager,
+        1L);
+
+    long clientID = Time.now();
+    String multipartUploadID = omClientResponse.getOMResponse()
+        .getInitiateMultiPartUploadResponse().getMultipartUploadID();
+
+    OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName,
+        bucketName, keyName, clientID, multipartUploadID, 1);
+
+    S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+        new S3MultipartUploadCommitPartRequest(commitMultipartRequest);
+
+    // Add key to open key table.
+    TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName,
+        keyName, clientID, HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+
+    omClientResponse =
+        s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager,
+        2L);
+
+
+    Assert.assertTrue(omClientResponse.getOMResponse().getStatus()
+        == OzoneManagerProtocolProtos.Status.OK);
+
+    String multipartKey = omMetadataManager.getMultipartKey(volumeName,
+        bucketName, keyName, multipartUploadID);
+
+    Assert.assertNotNull(
+        omMetadataManager.getMultipartInfoTable().get(multipartKey));
+    Assert.assertTrue(omMetadataManager.getMultipartInfoTable()
+        .get(multipartKey).getPartKeyInfoMap().size() == 1);
+    Assert.assertNull(omMetadataManager.getOpenKeyTable()
+        .get(omMetadataManager.getOpenKey(volumeName, bucketName, keyName,
+            clientID)));
+
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheMultipartNotFound() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+
+
+    long clientID = Time.now();
+    String multipartUploadID = UUID.randomUUID().toString();
+
+    OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName,
+        bucketName, keyName, clientID, multipartUploadID, 1);
+
+    S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+        new S3MultipartUploadCommitPartRequest(commitMultipartRequest);
+
+    // Add key to open key table.
+    TestOMRequestUtils.addKeyToTable(true, volumeName, bucketName,
+        keyName, clientID, HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.ONE, omMetadataManager);
+
+    OMClientResponse omClientResponse =
+        s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager,
+            2L);
+
+
+    Assert.assertTrue(omClientResponse.getOMResponse().getStatus()
+        == OzoneManagerProtocolProtos.Status.NO_SUCH_MULTIPART_UPLOAD_ERROR);
+
+    String multipartKey = omMetadataManager.getMultipartKey(volumeName,
+        bucketName, keyName, multipartUploadID);
+
+    Assert.assertNull(
+        omMetadataManager.getMultipartInfoTable().get(multipartKey));
+
+  }
+
+  @Test
+  public void testValidateAndUpdateCacheKeyNotFound() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    TestOMRequestUtils.addVolumeAndBucketToDB(volumeName, bucketName,
+        omMetadataManager);
+
+
+    long clientID = Time.now();
+    String multipartUploadID = UUID.randomUUID().toString();
+
+    OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName,
+        bucketName, keyName, clientID, multipartUploadID, 1);
+
+    // Don't add key to open table entry, and we are trying to commit this MPU
+    // part. It will fail with KEY_NOT_FOUND
+
+    S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+        new S3MultipartUploadCommitPartRequest(commitMultipartRequest);
+
+
+    OMClientResponse omClientResponse =
+        s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager,
+            2L);
+
+    Assert.assertTrue(omClientResponse.getOMResponse().getStatus()
+        == OzoneManagerProtocolProtos.Status.KEY_NOT_FOUND);
+
+  }
+
+
+  @Test
+  public void testValidateAndUpdateCacheBucketFound() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    TestOMRequestUtils.addVolumeToDB(volumeName, omMetadataManager);
+
+
+    long clientID = Time.now();
+    String multipartUploadID = UUID.randomUUID().toString();
+
+    OMRequest commitMultipartRequest = doPreExecuteCommitMPU(volumeName,
+        bucketName, keyName, clientID, multipartUploadID, 1);
+
+    // Don't add key to open table entry, and we are trying to commit this MPU
+    // part. It will fail with KEY_NOT_FOUND
+
+    S3MultipartUploadCommitPartRequest s3MultipartUploadCommitPartRequest =
+        new S3MultipartUploadCommitPartRequest(commitMultipartRequest);
+
+
+    OMClientResponse omClientResponse =
+        s3MultipartUploadCommitPartRequest.validateAndUpdateCache(ozoneManager,
+            2L);
+
+    Assert.assertTrue(omClientResponse.getOMResponse().getStatus()
+        == OzoneManagerProtocolProtos.Status.BUCKET_NOT_FOUND);
+
+  }
+}