Bladeren bron

HDDS-916. MultipartUpload: Complete Multipart upload request. Contributed by Bharat Viswanadham.

Márton Elek 6 jaren geleden
bovenliggende
commit
d14c56d150
19 gewijzigde bestanden met toevoegingen van 883 en 8 verwijderingen
  1. 4 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
  2. 26 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
  3. 27 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
  4. 10 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
  5. 27 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
  6. 2 1
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
  7. 4 3
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java
  8. 70 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadCompleteInfo.java
  9. 63 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java
  10. 20 1
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
  11. 52 1
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
  12. 25 0
      hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
  13. 270 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
  14. 19 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
  15. 157 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
  16. 11 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
  17. 27 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
  18. 4 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
  19. 65 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java

+ 4 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java

@@ -237,10 +237,14 @@ public final class OzoneConsts {
   public static final String REPLICATION_TYPE = "replicationType";
   public static final String REPLICATION_FACTOR = "replicationFactor";
   public static final String KEY_LOCATION_INFO = "keyLocationInfo";
+  public static final String MULTIPART_LIST = "multipartList";
 
 
   // For OM metrics saving to a file
   public static final String OM_METRICS_FILE = "omMetrics";
   public static final String OM_METRICS_TEMP_FILE = OM_METRICS_FILE + ".tmp";
 
+  // For Multipart upload
+  public static final int OM_MULTIPART_MIN_SIZE = 5 * 1024 * 1024;
+
 }

+ 26 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java

@@ -31,10 +31,12 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 
 /**
@@ -353,6 +355,15 @@ public class OzoneBucket {
         defaultReplication);
   }
 
+  /**
+   * Create a part key for a multipart upload key.
+   * @param key
+   * @param size
+   * @param partNumber
+   * @param uploadID
+   * @return OzoneOutputStream
+   * @throws IOException
+   */
   public OzoneOutputStream createMultipartKey(String key, long size,
                                               int partNumber, String uploadID)
       throws IOException {
@@ -360,6 +371,21 @@ public class OzoneBucket {
         uploadID);
   }
 
+  /**
+   * Complete Multipart upload. This will combine all the parts and make the
+   * key visible in ozone.
+   * @param key
+   * @param uploadID
+   * @param partsMap
+   * @return OmMultipartUploadCompleteInfo
+   * @throws IOException
+   */
+  public OmMultipartUploadCompleteInfo completeMultipartUpload(String key,
+      String uploadID, Map<Integer, String> partsMap) throws IOException {
+    return proxy.completeMultipartUpload(volumeName, name, key, uploadID,
+        partsMap);
+  }
+
   /**
    * An Iterator to iterate over {@link OzoneKey} list.
    */

+ 27 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java

@@ -27,9 +27,11 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 /**
  * An implementer of this interface is capable of connecting to Ozone Cluster
@@ -401,10 +403,35 @@ public interface ClientProtocol {
       bucketName, String keyName, ReplicationType type, ReplicationFactor
       factor) throws IOException;
 
+  /**
+   * Create a part key for a multipart upload key.
+   * @param volumeName
+   * @param bucketName
+   * @param keyName
+   * @param size
+   * @param partNumber
+   * @param uploadID
+   * @return OzoneOutputStream
+   * @throws IOException
+   */
   OzoneOutputStream createMultipartKey(String volumeName, String bucketName,
                                        String keyName, long size,
                                        int partNumber, String uploadID)
       throws IOException;
 
+  /**
+   * Complete Multipart upload. This will combine all the parts and make the
+   * key visible in ozone.
+   * @param volumeName
+   * @param bucketName
+   * @param keyName
+   * @param uploadID
+   * @param partsMap
+   * @return OmMultipartUploadCompleteInfo
+   * @throws IOException
+   */
+  OmMultipartUploadCompleteInfo completeMultipartUpload(String volumeName,
+      String bucketName, String keyName, String uploadID,
+      Map<Integer, String> partsMap) throws IOException;
 
 }

+ 10 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails;
 import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
 import org.apache.hadoop.ozone.web.response.ListBuckets;
@@ -79,6 +80,7 @@ import java.net.URISyntaxException;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
@@ -975,4 +977,12 @@ public class RestClient implements ClientProtocol {
     throw new UnsupportedOperationException("Ozone REST protocol does not " +
         "support this operation.");
   }
+
+  @Override
+  public OmMultipartUploadCompleteInfo completeMultipartUpload(
+      String volumeName, String bucketName, String keyName, String uploadID,
+      Map<Integer, String> partsMap) throws IOException {
+    throw new UnsupportedOperationException("Ozone REST protocol does not " +
+        "support this operation.");
+  }
 }

+ 27 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -50,6 +50,8 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@@ -753,4 +755,29 @@ public class RpcClient implements ClientProtocol {
     return new OzoneOutputStream(groupOutputStream);
   }
 
+  @Override
+  public OmMultipartUploadCompleteInfo completeMultipartUpload(
+      String volumeName, String bucketName, String keyName, String uploadID,
+      Map<Integer, String> partsMap) throws IOException {
+    HddsClientUtils.verifyResourceName(volumeName, bucketName);
+    HddsClientUtils.checkNotNull(keyName, uploadID);
+
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setMultipartUploadID(uploadID)
+        .build();
+
+    OmMultipartUploadList omMultipartUploadList = new OmMultipartUploadList(
+        partsMap);
+
+    OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo =
+        ozoneManagerClient.completeMultipartUpload(keyArgs,
+            omMultipartUploadList);
+
+    return omMultipartUploadCompleteInfo;
+
+  }
+
 }

+ 2 - 1
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java

@@ -48,7 +48,8 @@ public enum OMAction implements AuditAction {
   READ_KEY,
   LIST_S3BUCKETS,
   INITIATE_MULTIPART_UPLOAD,
-  COMMIT_MULTIPART_UPLOAD_PARTKEY;
+  COMMIT_MULTIPART_UPLOAD_PARTKEY,
+  COMPLETE_MULTIPART_UPLOAD;
 
   @Override
   public String getAction() {

+ 4 - 3
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.TreeMap;
 
 /**
  * This class represents multipart upload information for a key, which holds
@@ -30,7 +31,7 @@ import java.util.Map;
  */
 public class OmMultipartKeyInfo {
   private String uploadID;
-  private Map<Integer, PartKeyInfo> partKeyInfoList;
+  private TreeMap<Integer, PartKeyInfo> partKeyInfoList;
 
   /**
    * Construct OmMultipartKeyInfo object which holds multipart upload
@@ -40,7 +41,7 @@ public class OmMultipartKeyInfo {
    */
   public OmMultipartKeyInfo(String id, Map<Integer, PartKeyInfo> list) {
     this.uploadID = id;
-    this.partKeyInfoList = new HashMap<>(list);
+    this.partKeyInfoList = new TreeMap<>(list);
   }
 
   /**
@@ -51,7 +52,7 @@ public class OmMultipartKeyInfo {
     return uploadID;
   }
 
-  public Map<Integer, PartKeyInfo> getPartKeyInfoList() {
+  public TreeMap<Integer, PartKeyInfo> getPartKeyInfoList() {
     return partKeyInfoList;
   }
 

+ 70 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadCompleteInfo.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+/**
+ * This class holds information about the response of complete Multipart
+ * upload request.
+ */
+public class OmMultipartUploadCompleteInfo {
+
+  private String volume;
+  private String bucket;
+  private String key;
+  private String hash; // this is used as ETag for S3.
+
+  public OmMultipartUploadCompleteInfo(String volumeName, String bucketName,
+                                       String keyName, String md5) {
+    this.volume = volumeName;
+    this.bucket = bucketName;
+    this.key = keyName;
+    this.hash = md5;
+  }
+
+  public String getVolume() {
+    return volume;
+  }
+
+  public void setVolume(String volume) {
+    this.volume = volume;
+  }
+
+  public String getBucket() {
+    return bucket;
+  }
+
+  public void setBucket(String bucket) {
+    this.bucket = bucket;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public void setKey(String key) {
+    this.key = key;
+  }
+
+  public String getHash() {
+    return hash;
+  }
+
+  public void setHash(String hash) {
+    this.hash = hash;
+  }
+}

+ 63 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java

@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.helpers;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Part;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * This class represents multipart list, which is required for
+ * CompleteMultipart upload request.
+ */
+public class OmMultipartUploadList {
+
+  private final TreeMap<Integer, String> multipartMap;
+
+  /**
+   * Construct OmMultipartUploadList which holds multipart map which contains
+   * part number and part name.
+   * @param partMap
+   */
+  public OmMultipartUploadList(Map<Integer, String> partMap) {
+    this.multipartMap = new TreeMap<>(partMap);
+  }
+
+  /**
+   * Return multipartMap which is a map of part number and part name.
+   * @return multipartMap
+   */
+  public TreeMap<Integer, String> getMultipartMap() {
+    return multipartMap;
+  }
+
+  /**
+   * Construct Part list from the multipartMap.
+   * @return List<Part>
+   */
+  public List<Part> getPartsList() {
+    List<Part> partList = new ArrayList<>();
+    multipartMap.forEach((partNumber, partName) -> partList.add(Part
+        .newBuilder().setPartName(partName).setPartNumber(partNumber).build()));
+    return partList;
+  }
+}

+ 20 - 1
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java

@@ -23,6 +23,8 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@@ -315,9 +317,26 @@ public interface OzoneManagerProtocol {
   OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException;
 
 
-
+  /**
+   * Commit Multipart upload part file.
+   * @param omKeyArgs
+   * @param clientID
+   * @return OmMultipartCommitUploadPartInfo
+   * @throws IOException
+   */
   OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
       OmKeyArgs omKeyArgs, long clientID) throws IOException;
 
+  /**
+   * Complete Multipart upload Request.
+   * @param omKeyArgs
+   * @param multipartUploadList
+   * @return OmMultipartUploadCompleteInfo
+   * @throws IOException
+   */
+  OmMultipartUploadCompleteInfo completeMultipartUpload(
+      OmKeyArgs omKeyArgs, OmMultipartUploadList multipartUploadList)
+      throws IOException;
+
 }
 

+ 52 - 1
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java

@@ -32,6 +32,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@@ -88,6 +90,10 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartInfoInitiateRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartInfoInitiateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartUploadCompleteRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartUploadCompleteResponse;
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.RenameKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto
@@ -970,6 +976,11 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
   @Override
   public OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
       OmKeyArgs omKeyArgs, long clientId) throws IOException {
+
+    List<OmKeyLocationInfo> locationInfoList = omKeyArgs.getLocationInfoList();
+    Preconditions.checkNotNull(locationInfoList);
+
+
     MultipartCommitUploadPartRequest.Builder multipartCommitUploadPartRequest
         = MultipartCommitUploadPartRequest.newBuilder();
 
@@ -979,7 +990,11 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
         .setKeyName(omKeyArgs.getKeyName())
         .setMultipartUploadID(omKeyArgs.getMultipartUploadID())
         .setIsMultipartKey(omKeyArgs.getIsMultipartKey())
-        .setMultipartNumber(omKeyArgs.getMultipartUploadPartNumber());
+        .setMultipartNumber(omKeyArgs.getMultipartUploadPartNumber())
+        .setDataSize(omKeyArgs.getDataSize())
+        .addAllKeyLocations(
+            locationInfoList.stream().map(OmKeyLocationInfo::getProtobuf)
+                .collect(Collectors.toList()));
     multipartCommitUploadPartRequest.setClientID(clientId);
     multipartCommitUploadPartRequest.setKeyArgs(keyArgs.build());
 
@@ -1002,6 +1017,42 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
     return info;
   }
 
+  @Override
+  public OmMultipartUploadCompleteInfo completeMultipartUpload(
+      OmKeyArgs omKeyArgs, OmMultipartUploadList multipartUploadList)
+      throws IOException {
+    MultipartUploadCompleteRequest.Builder multipartUploadCompleteRequest =
+        MultipartUploadCompleteRequest.newBuilder();
+
+    KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
+        .setVolumeName(omKeyArgs.getVolumeName())
+        .setBucketName(omKeyArgs.getBucketName())
+        .setKeyName(omKeyArgs.getKeyName())
+        .setMultipartUploadID(omKeyArgs.getMultipartUploadID());
+
+    multipartUploadCompleteRequest.setKeyArgs(keyArgs.build());
+    multipartUploadCompleteRequest.addAllPartsList(multipartUploadList
+        .getPartsList());
+
+    OMRequest omRequest = createOMRequest(
+        Type.CompleteMultiPartUpload)
+        .setCompleteMultiPartUploadRequest(
+            multipartUploadCompleteRequest.build()).build();
+
+    MultipartUploadCompleteResponse response = submitRequest(omRequest)
+        .getCompleteMultiPartUploadResponse();
+
+    if (response.getStatus() != Status.OK) {
+      throw new IOException("Complete multipart upload failed, error:" +
+          response.getStatus());
+    }
+
+    OmMultipartUploadCompleteInfo info = new
+        OmMultipartUploadCompleteInfo(response.getVolume(), response
+        .getBucket(), response.getKey(), response.getHash());
+    return info;
+  }
+
   public List<ServiceInfo> getServiceList() throws IOException {
     ServiceListRequest req = ServiceListRequest.newBuilder().build();
 

+ 25 - 0
hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto

@@ -65,6 +65,7 @@ enum Type {
   ListS3Buckets = 44;
   InitiateMultiPartUpload = 45;
   CommitMultiPartUpload = 46;
+  CompleteMultiPartUpload = 47;
 
   ServiceList = 51;
 }
@@ -105,6 +106,7 @@ message OMRequest {
   optional S3ListBucketsRequest             listS3BucketsRequest           = 44;
   optional MultipartInfoInitiateRequest     initiateMultiPartUploadRequest = 45;
   optional MultipartCommitUploadPartRequest commitMultiPartUploadRequest   = 46;
+  optional MultipartUploadCompleteRequest   completeMultiPartUploadRequest = 47;
 
   optional ServiceListRequest               serviceListRequest             = 51;
 }
@@ -146,6 +148,7 @@ message OMResponse {
   optional S3ListBucketsResponse             listS3BucketsResponse         = 44;
   optional MultipartInfoInitiateResponse   initiateMultiPartUploadResponse = 45;
   optional MultipartCommitUploadPartResponse commitMultiPartUploadResponse = 46;
+  optional MultipartUploadCompleteResponse completeMultiPartUploadResponse = 47;
 
   optional ServiceListResponse               ServiceListResponse           = 51;
 }
@@ -177,6 +180,10 @@ enum Status {
     INITIATE_MULTIPART_UPLOAD_ERROR = 24;
     MULTIPART_UPLOAD_PARTFILE_ERROR = 25;
     NO_SUCH_MULTIPART_UPLOAD_ERROR = 26;
+    MISMATCH_MULTIPART_LIST = 27;
+    MISSING_UPLOAD_PARTS = 28;
+    COMPLETE_MULTIPART_UPLOAD_ERROR = 29;
+    ENTITY_TOO_SMALL = 30;
 }
 
 
@@ -583,6 +590,24 @@ message MultipartCommitUploadPartResponse {
     required Status status = 2;
 }
 
+message MultipartUploadCompleteRequest {
+    required KeyArgs keyArgs = 1;
+    repeated Part partsList = 2;
+}
+
+message MultipartUploadCompleteResponse {
+    optional string volume = 1;
+    optional string bucket = 2;
+    optional string key = 3;
+    optional string hash = 4; // This will be used as etag for s3
+    required Status status = 5;
+}
+
+message Part {
+    required uint32 partNumber = 1;
+    required string partName = 2;
+}
+
 /**
  The OM service that takes care of Ozone namespace.
 */

+ 270 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java

@@ -69,8 +69,11 @@ import org.junit.rules.ExpectedException;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.UUID;
 
 import static org.hamcrest.CoreMatchers.containsString;
@@ -1497,7 +1500,274 @@ public class TestOzoneRpcClient {
       GenericTestUtils.assertExceptionContains("NO_SUCH_MULTIPART_UPLOAD_ERROR",
           ex);
     }
+  }
+
+  @Test
+  public void testMultipartUpload() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    doMultipartUpload(bucket, keyName, (byte)98);
+
+  }
+
+
+  @Test
+  public void testMultipartUploadOverride() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    doMultipartUpload(bucket, keyName, (byte)96);
+
+    // Initiate Multipart upload again, now we should read latest version, as
+    // read always reads latest blocks.
+    doMultipartUpload(bucket, keyName, (byte)97);
+
+  }
+
+
+  @Test
+  public void testMultipartUploadWithPartsLessThanMinSize() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    // Initiate multipart upload
+    String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
+        .STAND_ALONE, ReplicationFactor.ONE);
+
+    // Upload Parts
+    Map<Integer, String> partsMap = new TreeMap<>();
+    // Uploading part 1 with less than min size
+    String partName = uploadPart(bucket, keyName, uploadID, 1, "data".getBytes(
+        "UTF-8"));
+    partsMap.put(1, partName);
+
+    partName = uploadPart(bucket, keyName, uploadID, 2, "data".getBytes(
+        "UTF-8"));
+    partsMap.put(2, partName);
+
+
+    // Complete multipart upload
+
+    try {
+      completeMultipartUpload(bucket, keyName, uploadID, partsMap);
+      fail("testMultipartUploadWithPartsLessThanMinSize failed");
+    } catch (IOException ex) {
+      GenericTestUtils.assertExceptionContains("ENTITY_TOO_SMALL", ex);
+    }
+
+  }
+
+
+
+  @Test
+  public void testMultipartUploadWithPartsMisMatchWithListSizeDifferent()
+      throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
+        .STAND_ALONE, ReplicationFactor.ONE);
+
+    // We have not uploaded any parts, but passing some list it should throw
+    // error.
+    TreeMap<Integer, String> partsMap = new TreeMap<>();
+    partsMap.put(1, UUID.randomUUID().toString());
+
+    try {
+      completeMultipartUpload(bucket, keyName, uploadID, partsMap);
+      fail("testMultipartUploadWithPartsMisMatch");
+    } catch (IOException ex) {
+      GenericTestUtils.assertExceptionContains("MISMATCH_MULTIPART_LIST", ex);
+    }
+
+  }
+
+  @Test
+  public void testMultipartUploadWithPartsMisMatchWithIncorrectPartName()
+      throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
+        .STAND_ALONE, ReplicationFactor.ONE);
+
+    uploadPart(bucket, keyName, uploadID, 1, "data".getBytes("UTF-8"));
+    // We have not uploaded any parts, but passing some list it should throw
+    // error.
+    TreeMap<Integer, String> partsMap = new TreeMap<>();
+    partsMap.put(1, UUID.randomUUID().toString());
+
+    try {
+      completeMultipartUpload(bucket, keyName, uploadID, partsMap);
+      fail("testMultipartUploadWithPartsMisMatch");
+    } catch (IOException ex) {
+      GenericTestUtils.assertExceptionContains("MISMATCH_MULTIPART_LIST", ex);
+    }
+
+  }
+
+  @Test
+  public void testMultipartUploadWithMissingParts() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
+        .STAND_ALONE, ReplicationFactor.ONE);
+
+    uploadPart(bucket, keyName, uploadID, 1, "data".getBytes("UTF-8"));
+    // We have not uploaded any parts, but passing some list it should throw
+    // error.
+    TreeMap<Integer, String> partsMap = new TreeMap<>();
+    partsMap.put(3, "random");
+
+    try {
+      completeMultipartUpload(bucket, keyName, uploadID, partsMap);
+      fail("testMultipartUploadWithPartsMisMatch");
+    } catch (IOException ex) {
+      GenericTestUtils.assertExceptionContains("MISSING_UPLOAD_PARTS", ex);
+    }
+  }
+
+
+  private byte[] generateData(int size, byte val) {
+    byte[] chars = new byte[size];
+    Arrays.fill(chars, val);
+    return chars;
+  }
+
+
+  private void doMultipartUpload(OzoneBucket bucket, String keyName, byte val)
+      throws Exception {
+    // Initiate Multipart upload request
+    String uploadID = initiateMultipartUpload(bucket, keyName, ReplicationType
+        .RATIS, ReplicationFactor.THREE);
+
+    // Upload parts
+    Map<Integer, String> partsMap = new TreeMap<>();
+
+    // get 5mb data, as each part should be of min 5mb, last part can be less
+    // than 5mb
+    int length = 0;
+    byte[] data = generateData(OzoneConsts.OM_MULTIPART_MIN_SIZE, val);
+    String partName = uploadPart(bucket, keyName, uploadID, 1, data);
+    partsMap.put(1, partName);
+    length += data.length;
+
+
+    partName = uploadPart(bucket, keyName, uploadID, 2, data);
+    partsMap.put(2, partName);
+    length += data.length;
+
+    String part3 = UUID.randomUUID().toString();
+    partName = uploadPart(bucket, keyName, uploadID, 3, part3.getBytes(
+        "UTF-8"));
+    partsMap.put(3, partName);
+    length += part3.getBytes("UTF-8").length;
+
+
+    // Complete multipart upload request
+    completeMultipartUpload(bucket, keyName, uploadID, partsMap);
+
+
+    //Now Read the key which has been completed multipart upload.
+    byte[] fileContent = new byte[data.length + data.length + part3.getBytes(
+        "UTF-8").length];
+    OzoneInputStream inputStream = bucket.readKey(keyName);
+    inputStream.read(fileContent);
+
+    Assert.assertTrue(verifyRatisReplication(bucket.getVolumeName(),
+        bucket.getName(), keyName, ReplicationType.RATIS,
+        ReplicationFactor.THREE));
+
+    StringBuilder sb = new StringBuilder(length);
+
+    // Combine all parts data, and check is it matching with get key data.
+    String part1 = new String(data);
+    String part2 = new String(data);
+    sb.append(part1);
+    sb.append(part2);
+    sb.append(part3);
+    Assert.assertEquals(sb.toString(), new String(fileContent));
+  }
+
+
+  private String initiateMultipartUpload(OzoneBucket bucket, String keyName,
+      ReplicationType replicationType, ReplicationFactor replicationFactor)
+      throws Exception {
+    OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
+        replicationType, replicationFactor);
+
+    String uploadID = multipartInfo.getUploadID();
+    Assert.assertNotNull(uploadID);
+    return uploadID;
+  }
+
+  private String uploadPart(OzoneBucket bucket, String keyName, String
+      uploadID, int partNumber, byte[] data) throws Exception {
+    OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
+        data.length, partNumber, uploadID);
+    ozoneOutputStream.write(data, 0,
+        data.length);
+    ozoneOutputStream.close();
+
+    OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
+        ozoneOutputStream.getCommitUploadPartInfo();
+
+    Assert.assertNotNull(omMultipartCommitUploadPartInfo);
+    Assert.assertNotNull(omMultipartCommitUploadPartInfo.getPartName());
+    return omMultipartCommitUploadPartInfo.getPartName();
+
+  }
 
+  private void completeMultipartUpload(OzoneBucket bucket, String keyName,
+      String uploadID, Map<Integer, String> partsMap) throws Exception {
+    OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = bucket
+        .completeMultipartUpload(keyName, uploadID, partsMap);
+
+    Assert.assertNotNull(omMultipartUploadCompleteInfo);
+    Assert.assertEquals(omMultipartUploadCompleteInfo.getBucket(), bucket
+        .getName());
+    Assert.assertEquals(omMultipartUploadCompleteInfo.getVolume(), bucket
+        .getVolumeName());
+    Assert.assertEquals(omMultipartUploadCompleteInfo.getKey(), keyName);
+    Assert.assertNotNull(omMultipartUploadCompleteInfo.getHash());
   }
 
 

+ 19 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java

@@ -23,6 +23,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.utils.BackgroundService;
 
@@ -191,8 +193,24 @@ public interface KeyManager {
    */
   OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException;
 
+  /**
+   * Commit Multipart upload part file.
+   * @param omKeyArgs
+   * @param clientID
+   * @return OmMultipartCommitUploadPartInfo
+   * @throws IOException
+   */
 
   OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
-      OmKeyArgs keyArgs, long clientID) throws IOException;
+      OmKeyArgs omKeyArgs, long clientID) throws IOException;
 
+  /**
+   * Complete Multipart upload Request.
+   * @param omKeyArgs
+   * @param multipartUploadList
+   * @return OmMultipartUploadCompleteInfo
+   * @throws IOException
+   */
+  OmMultipartUploadCompleteInfo completeMultipartUpload(OmKeyArgs omKeyArgs,
+      OmMultipartUploadList multipartUploadList) throws IOException;
 }

+ 157 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

@@ -22,9 +22,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -43,6 +45,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .PartKeyInfo;
@@ -62,6 +66,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MA
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -677,6 +682,10 @@ public class KeyManagerImpl implements KeyManager {
       OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(
           openKey);
 
+      // set the data size and location info list
+      keyInfo.setDataSize(omKeyArgs.getDataSize());
+      keyInfo.updateLocationInfoList(omKeyArgs.getLocationInfoList());
+
       partName = keyName + clientID;
       if (multipartKeyInfo == null) {
         throw new OMException("No such Multipart upload is with specified " +
@@ -729,4 +738,152 @@ public class KeyManagerImpl implements KeyManager {
 
   }
 
+  @Override
+  public OmMultipartUploadCompleteInfo completeMultipartUpload(
+      OmKeyArgs omKeyArgs, OmMultipartUploadList multipartUploadList)
+      throws IOException {
+    Preconditions.checkNotNull(omKeyArgs);
+    Preconditions.checkNotNull(multipartUploadList);
+    String volumeName = omKeyArgs.getVolumeName();
+    String bucketName = omKeyArgs.getBucketName();
+    String keyName = omKeyArgs.getKeyName();
+    String uploadID = omKeyArgs.getMultipartUploadID();
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    try {
+      String multipartKey = metadataManager.getMultipartKey(volumeName,
+          bucketName, keyName, uploadID);
+      String ozoneKey = metadataManager.getOzoneKey(volumeName, bucketName,
+          keyName);
+      OmKeyInfo keyInfo = metadataManager.getKeyTable().get(ozoneKey);
+
+      OmMultipartKeyInfo multipartKeyInfo = metadataManager
+          .getMultipartInfoTable().get(multipartKey);
+      if (multipartKeyInfo == null) {
+        throw new OMException("Complete Multipart Upload Failed: volume: " +
+            volumeName + "bucket: " + bucketName + "key: " + keyName,
+            ResultCodes.NO_SUCH_MULTIPART_UPLOAD);
+      }
+      TreeMap<Integer, PartKeyInfo> partKeyInfoMap = multipartKeyInfo
+          .getPartKeyInfoList();
+
+      TreeMap<Integer, String> multipartMap = multipartUploadList
+          .getMultipartMap();
+
+      // Last key in the map should be having key value as size, as map's
+      // are sorted. Last entry in both maps should have partNumber as size
+      // of the map. As we have part entries 1, 2, 3, 4 and then we get
+      // complete multipart upload request so the map last entry should have 4,
+      // if it is having value greater or less than map size, then there is
+      // some thing wrong throw error.
+
+      Map.Entry<Integer, String> multipartMapLastEntry = multipartMap
+          .lastEntry();
+      Map.Entry<Integer, PartKeyInfo> partKeyInfoLastEntry = partKeyInfoMap
+          .lastEntry();
+      if (partKeyInfoMap.size() != multipartMap.size()) {
+        throw new OMException("Complete Multipart Upload Failed: volume: " +
+            volumeName + "bucket: " + bucketName + "key: " + keyName,
+            ResultCodes.MISMATCH_MULTIPART_LIST);
+      }
+
+      // Last entry part Number should be the size of the map, otherwise this
+      // means we have missing some parts but we got a complete request.
+      if (multipartMapLastEntry.getKey() != partKeyInfoMap.size() ||
+          partKeyInfoLastEntry.getKey() != partKeyInfoMap.size()) {
+        throw new OMException("Complete Multipart Upload Failed: volume: " +
+            volumeName + "bucket: " + bucketName + "key: " + keyName,
+            ResultCodes.MISSING_UPLOAD_PARTS);
+      }
+      ReplicationType type = partKeyInfoLastEntry.getValue().getPartKeyInfo()
+          .getType();
+      ReplicationFactor factor = partKeyInfoLastEntry.getValue()
+          .getPartKeyInfo().getFactor();
+      List<OmKeyLocationInfo> locations = new ArrayList<>();
+      long size = 0;
+      int partsCount =1;
+      int partsMapSize = partKeyInfoMap.size();
+      for(Map.Entry<Integer, PartKeyInfo> partKeyInfoEntry : partKeyInfoMap
+          .entrySet()) {
+        int partNumber = partKeyInfoEntry.getKey();
+        PartKeyInfo partKeyInfo = partKeyInfoEntry.getValue();
+        // Check we have all parts to complete multipart upload and also
+        // check partNames provided match with actual part names
+        String providedPartName = multipartMap.get(partNumber);
+        String actualPartName = partKeyInfo.getPartName();
+        if (partNumber == partsCount) {
+          if (!actualPartName.equals(providedPartName)) {
+            throw new OMException("Complete Multipart Upload Failed: volume: " +
+                volumeName + "bucket: " + bucketName + "key: " + keyName,
+                ResultCodes.MISMATCH_MULTIPART_LIST);
+          }
+          OmKeyInfo currentPartKeyInfo = OmKeyInfo
+              .getFromProtobuf(partKeyInfo.getPartKeyInfo());
+          // Check if any part size is less than 5mb, last part can be less
+          // than 5 mb.
+          if (partsCount != partsMapSize &&
+              currentPartKeyInfo.getDataSize() < OM_MULTIPART_MIN_SIZE) {
+            throw new OMException("Complete Multipart Upload Failed: Entity " +
+                "too small: volume: " + volumeName + "bucket: " + bucketName
+                + "key: " + keyName, ResultCodes.ENTITY_TOO_SMALL);
+          }
+          // As all part keys will have only one version.
+          OmKeyLocationInfoGroup currentKeyInfoGroup = currentPartKeyInfo
+              .getKeyLocationVersions().get(0);
+          locations.addAll(currentKeyInfoGroup.getLocationList());
+          size += currentPartKeyInfo.getDataSize();
+        } else {
+          throw new OMException("Complete Multipart Upload Failed: volume: " +
+              volumeName + "bucket: " + bucketName + "key: " + keyName,
+              ResultCodes.MISSING_UPLOAD_PARTS);
+        }
+        partsCount++;
+      }
+      if (keyInfo == null) {
+        // This is a newly added key, it does not have any versions.
+        OmKeyLocationInfoGroup keyLocationInfoGroup = new
+            OmKeyLocationInfoGroup(0, locations);
+        // A newly created key, this is the first version.
+        keyInfo = new OmKeyInfo.Builder()
+            .setVolumeName(omKeyArgs.getVolumeName())
+            .setBucketName(omKeyArgs.getBucketName())
+            .setKeyName(omKeyArgs.getKeyName())
+            .setReplicationFactor(factor)
+            .setReplicationType(type)
+            .setCreationTime(Time.now())
+            .setModificationTime(Time.now())
+            .setDataSize(size)
+            .setOmKeyLocationInfos(
+                Collections.singletonList(keyLocationInfoGroup))
+            .build();
+      } else {
+        // Already a version exists, so we should add it as a new version.
+        // But now as versioning is not supported, just following the commit
+        // key approach.
+        // When versioning support comes, then we can uncomment below code
+        // keyInfo.addNewVersion(locations);
+        keyInfo.updateLocationInfoList(locations);
+      }
+      DBStore store = metadataManager.getStore();
+      try (BatchOperation batch = store.initBatchOperation()) {
+        //Remove entry in multipart table and add a entry in to key table
+        metadataManager.getMultipartInfoTable().deleteWithBatch(batch,
+            multipartKey);
+        metadataManager.getKeyTable().putWithBatch(batch,
+            ozoneKey, keyInfo);
+        store.commitBatchOperation(batch);
+      }
+      return new OmMultipartUploadCompleteInfo(omKeyArgs.getVolumeName(),
+          omKeyArgs.getBucketName(), omKeyArgs.getKeyName(), DigestUtils
+              .sha256Hex(keyName));
+    } catch (OMException ex) {
+      throw ex;
+    } catch (IOException ex) {
+      LOG.error("Complete Multipart Upload Failed: volume: " + volumeName +
+          "bucket: " + bucketName + "key: " + keyName, ex);
+      throw new OMException(ex.getMessage(), ResultCodes
+          .COMPLETE_MULTIPART_UPLOAD_FAILED);
+    } finally {
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+    }
+  }
 }

+ 11 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java

@@ -62,6 +62,7 @@ public class OMMetrics {
   private @Metric MutableCounterLong numGetServiceLists;
   private @Metric MutableCounterLong numListS3Buckets;
   private @Metric MutableCounterLong numInitiateMultipartUploads;
+  private @Metric MutableCounterLong numCompleteMultipartUploads;
 
 
   // Failure Metrics
@@ -88,6 +89,7 @@ public class OMMetrics {
   private @Metric MutableCounterLong numInitiateMultipartUploadFails;
   private @Metric MutableCounterLong numCommitMultipartUploadParts;
   private @Metric MutableCounterLong getNumCommitMultipartUploadPartFails;
+  private @Metric MutableCounterLong numCompleteMultipartUploadFails;
 
   // Metrics for total number of volumes, buckets and keys
 
@@ -247,6 +249,15 @@ public class OMMetrics {
     numInitiateMultipartUploadFails.incr();
   }
 
+  public void incNumCompleteMultipartUploads() {
+    numKeyOps.incr();
+    numCompleteMultipartUploads.incr();
+  }
+
+  public void incNumCompleteMultipartUploadFails() {
+    numCompleteMultipartUploadFails.incr();
+  }
+
   public void incNumGetServiceLists() {
     numGetServiceLists.incr();
   }

+ 27 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -63,6 +63,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@@ -1645,6 +1647,31 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     return commitUploadPartInfo;
   }
 
+  @Override
+  public OmMultipartUploadCompleteInfo completeMultipartUpload(
+      OmKeyArgs omKeyArgs, OmMultipartUploadList multipartUploadList)
+      throws IOException {
+    OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo;
+    metrics.incNumCompleteMultipartUploads();
+
+    Map<String, String> auditMap = (omKeyArgs == null) ? new LinkedHashMap<>() :
+        omKeyArgs.toAuditMap();
+    auditMap.put(OzoneConsts.MULTIPART_LIST, multipartUploadList
+        .getMultipartMap().toString());
+    try {
+      omMultipartUploadCompleteInfo = keyManager.completeMultipartUpload(
+          omKeyArgs, multipartUploadList);
+      AUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMAction
+          .COMPLETE_MULTIPART_UPLOAD, auditMap));
+      return omMultipartUploadCompleteInfo;
+    } catch (IOException ex) {
+      metrics.incNumCompleteMultipartUploadFails();
+      AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction
+          .COMPLETE_MULTIPART_UPLOAD, auditMap, ex));
+      throw ex;
+    }
+  }
+
 
 
 

+ 4 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java

@@ -119,6 +119,10 @@ public class OMException extends IOException {
     INITIATE_MULTIPART_UPLOAD_FAILED,
     NO_SUCH_MULTIPART_UPLOAD,
     UPLOAD_PART_FAILED,
+    MISMATCH_MULTIPART_LIST,
+    MISSING_UPLOAD_PARTS,
+    COMPLETE_MULTIPART_UPLOAD_FAILED,
+    ENTITY_TOO_SMALL,
     INVALID_REQUEST;
   }
 }

+ 65 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java

@@ -28,6 +28,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
@@ -100,6 +102,10 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartCommitUploadPartRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartCommitUploadPartResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartUploadCompleteRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartUploadCompleteResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartInfoInitiateRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -108,6 +114,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Part;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .RenameKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -149,6 +156,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 /**
@@ -331,6 +339,13 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
       responseBuilder.setCommitMultiPartUploadResponse(
           commitUploadPartResponse);
       break;
+    case CompleteMultiPartUpload:
+      MultipartUploadCompleteResponse completeMultipartUploadResponse =
+          completeMultipartUpload(
+              request.getCompleteMultiPartUploadRequest());
+      responseBuilder.setCompleteMultiPartUploadResponse(
+          completeMultipartUploadResponse);
+      break;
     case ServiceList:
       ServiceListResponse serviceListResponse = getServiceList(
           request.getServiceListRequest());
@@ -392,7 +407,14 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
         return Status.NO_SUCH_MULTIPART_UPLOAD_ERROR;
       case UPLOAD_PART_FAILED:
         return Status.MULTIPART_UPLOAD_PARTFILE_ERROR;
-
+      case COMPLETE_MULTIPART_UPLOAD_FAILED:
+        return Status.COMPLETE_MULTIPART_UPLOAD_ERROR;
+      case MISMATCH_MULTIPART_LIST:
+        return Status.MISMATCH_MULTIPART_LIST;
+      case MISSING_UPLOAD_PARTS:
+        return Status.MISSING_UPLOAD_PARTS;
+      case ENTITY_TOO_SMALL:
+        return Status.ENTITY_TOO_SMALL;
       default:
         return Status.INTERNAL_ERROR;
       }
@@ -839,6 +861,10 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
           .setMultipartUploadID(keyArgs.getMultipartUploadID())
           .setIsMultipartKey(keyArgs.getIsMultipartKey())
           .setMultipartUploadPartNumber(keyArgs.getMultipartNumber())
+          .setDataSize(keyArgs.getDataSize())
+          .setLocationInfoList(keyArgs.getKeyLocationsList().stream()
+              .map(OmKeyLocationInfo::getFromProtobuf)
+              .collect(Collectors.toList()))
           .build();
       OmMultipartCommitUploadPartInfo commitUploadPartInfo =
           impl.commitMultipartUploadPart(omKeyArgs, request.getClientID());
@@ -849,4 +875,42 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
     }
     return resp.build();
   }
+
+
+  private MultipartUploadCompleteResponse completeMultipartUpload(
+      MultipartUploadCompleteRequest request) {
+    MultipartUploadCompleteResponse.Builder response =
+        MultipartUploadCompleteResponse.newBuilder();
+
+    try {
+      KeyArgs keyArgs = request.getKeyArgs();
+      List<Part> partsList = request.getPartsListList();
+
+      TreeMap<Integer, String> partsMap = new TreeMap<>();
+      for (Part part : partsList) {
+        partsMap.put(part.getPartNumber(), part.getPartName());
+      }
+
+      OmMultipartUploadList omMultipartUploadList =
+          new OmMultipartUploadList(partsMap);
+
+      OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+          .setVolumeName(keyArgs.getVolumeName())
+          .setBucketName(keyArgs.getBucketName())
+          .setKeyName(keyArgs.getKeyName())
+          .setMultipartUploadID(keyArgs.getMultipartUploadID())
+          .build();
+      OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = impl
+          .completeMultipartUpload(omKeyArgs, omMultipartUploadList);
+
+      response.setVolume(omMultipartUploadCompleteInfo.getVolume())
+          .setBucket(omMultipartUploadCompleteInfo.getBucket())
+          .setKey(omMultipartUploadCompleteInfo.getKey())
+          .setHash(omMultipartUploadCompleteInfo.getHash());
+      response.setStatus(Status.OK);
+    } catch (IOException ex) {
+      response.setStatus(exceptionToResponseStatus(ex));
+    }
+    return response.build();
+  }
 }