Browse Source

HDDS-889. MultipartUpload: Support uploading a part file in ozone. Contributed by Bharat Viswanadham.

Márton Elek 6 năm trước cách đây
mục cha
commit
d44b37d7ac
21 tập tin đã thay đổi với 718 bổ sung56 xóa
  1. 7 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
  2. 39 8
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
  3. 10 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
  4. 5 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
  5. 12 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
  6. 50 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
  7. 2 1
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
  8. 40 2
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
  9. 6 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
  10. 34 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartCommitUploadPartInfo.java
  11. 12 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java
  12. 7 1
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
  13. 58 2
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
  14. 19 0
      hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
  15. 180 8
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
  16. 5 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
  17. 152 33
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
  18. 11 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
  19. 27 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
  20. 3 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
  21. 39 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java

+ 7 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java

@@ -353,6 +353,13 @@ public class OzoneBucket {
         defaultReplication);
         defaultReplication);
   }
   }
 
 
+  public OzoneOutputStream createMultipartKey(String key, long size,
+                                              int partNumber, String uploadID)
+      throws IOException {
+    return proxy.createMultipartKey(volumeName, name, key, size, partNumber,
+        uploadID);
+  }
+
   /**
   /**
    * An Iterator to iterate over {@link OzoneKey} list.
    * An Iterator to iterate over {@link OzoneKey} list.
    */
    */

+ 39 - 8
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java

@@ -25,13 +25,9 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.Checksum;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.*;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -85,6 +81,7 @@ public class ChunkGroupOutputStream extends OutputStream {
   private final long blockSize;
   private final long blockSize;
   private final Checksum checksum;
   private final Checksum checksum;
   private List<ByteBuffer> bufferList;
   private List<ByteBuffer> bufferList;
+  private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   /**
   /**
    * A constructor for testing purpose only.
    * A constructor for testing purpose only.
    */
    */
@@ -152,7 +149,7 @@ public class ChunkGroupOutputStream extends OutputStream {
       OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize,
       OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize,
       String requestId, ReplicationFactor factor, ReplicationType type,
       String requestId, ReplicationFactor factor, ReplicationType type,
       long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
       long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
-      Checksum checksum) {
+      Checksum checksum, String uploadID, int partNumber, boolean isMultipart) {
     this.streamEntries = new ArrayList<>();
     this.streamEntries = new ArrayList<>();
     this.currentStreamIndex = 0;
     this.currentStreamIndex = 0;
     this.omClient = omClient;
     this.omClient = omClient;
@@ -161,6 +158,8 @@ public class ChunkGroupOutputStream extends OutputStream {
     this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
     this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
         .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
         .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
         .setType(type).setFactor(factor).setDataSize(info.getDataSize())
         .setType(type).setFactor(factor).setDataSize(info.getDataSize())
+        .setIsMultipartKey(isMultipart).setMultipartUploadID(
+            uploadID).setMultipartUploadPartNumber(partNumber)
         .build();
         .build();
     this.openID = handler.getId();
     this.openID = handler.getId();
     this.xceiverClientManager = xceiverClientManager;
     this.xceiverClientManager = xceiverClientManager;
@@ -498,7 +497,15 @@ public class ChunkGroupOutputStream extends OutputStream {
         removeEmptyBlocks();
         removeEmptyBlocks();
         keyArgs.setDataSize(getKeyLength());
         keyArgs.setDataSize(getKeyLength());
         keyArgs.setLocationInfoList(getLocationInfoList());
         keyArgs.setLocationInfoList(getLocationInfoList());
-        omClient.commitKey(keyArgs, openID);
+        // When the key is multipart upload part file upload, we should not
+        // commit the key, as this is not an actual key, this is a just a
+        // partial key of a large file.
+        if (keyArgs.getIsMultipartKey()) {
+          commitUploadPartInfo = omClient.commitMultipartUploadPart(keyArgs,
+              openID);
+        } else {
+          omClient.commitKey(keyArgs, openID);
+        }
       } else {
       } else {
         LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
         LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
       }
       }
@@ -512,6 +519,10 @@ public class ChunkGroupOutputStream extends OutputStream {
     }
     }
   }
   }
 
 
+  public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+    return commitUploadPartInfo;
+  }
+
   /**
   /**
    * Builder class of ChunkGroupOutputStream.
    * Builder class of ChunkGroupOutputStream.
    */
    */
@@ -529,6 +540,20 @@ public class ChunkGroupOutputStream extends OutputStream {
     private long blockSize;
     private long blockSize;
     private long watchTimeout;
     private long watchTimeout;
     private Checksum checksum;
     private Checksum checksum;
+    private String multipartUploadID;
+    private int multipartNumber;
+    private boolean isMultipartKey;
+
+
+    public Builder setMultipartUploadID(String uploadID) {
+      this.multipartUploadID = uploadID;
+      return this;
+    }
+
+    public Builder setMultipartNumber(int partNumber) {
+      this.multipartNumber = partNumber;
+      return this;
+    }
 
 
     public Builder setHandler(OpenKeySession handler) {
     public Builder setHandler(OpenKeySession handler) {
       this.openHandler = handler;
       this.openHandler = handler;
@@ -597,10 +622,16 @@ public class ChunkGroupOutputStream extends OutputStream {
       return this;
       return this;
     }
     }
 
 
+    public Builder setIsMultipartKey(boolean isMultipart) {
+      this.isMultipartKey = isMultipart;
+      return this;
+    }
+
     public ChunkGroupOutputStream build() throws IOException {
     public ChunkGroupOutputStream build() throws IOException {
       return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
       return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
           omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
           omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
-          streamBufferMaxSize, blockSize, watchTimeout, checksum);
+          streamBufferMaxSize, blockSize, watchTimeout, checksum,
+          multipartUploadID, multipartNumber, isMultipartKey);
     }
     }
   }
   }
 
 

+ 10 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java

@@ -17,6 +17,8 @@
 
 
 package org.apache.hadoop.ozone.client.io;
 package org.apache.hadoop.ozone.client.io;
 
 
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+
 import java.io.IOException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStream;
 
 
@@ -58,6 +60,14 @@ public class OzoneOutputStream extends OutputStream {
     outputStream.close();
     outputStream.close();
   }
   }
 
 
+  public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+    if (outputStream instanceof ChunkGroupOutputStream) {
+      return ((ChunkGroupOutputStream) outputStream).getCommitUploadPartInfo();
+    }
+    // Otherwise return null.
+    return null;
+  }
+
   public OutputStream getOutputStream() {
   public OutputStream getOutputStream() {
     return outputStream;
     return outputStream;
   }
   }

+ 5 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java

@@ -401,5 +401,10 @@ public interface ClientProtocol {
       bucketName, String keyName, ReplicationType type, ReplicationFactor
       bucketName, String keyName, ReplicationType type, ReplicationFactor
       factor) throws IOException;
       factor) throws IOException;
 
 
+  OzoneOutputStream createMultipartKey(String volumeName, String bucketName,
+                                       String keyName, long size,
+                                       int partNumber, String uploadID)
+      throws IOException;
+
 
 
 }
 }

+ 12 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java

@@ -963,4 +963,16 @@ public class RestClient implements ClientProtocol {
     throw new UnsupportedOperationException("Ozone REST protocol does not " +
     throw new UnsupportedOperationException("Ozone REST protocol does not " +
         "support this operation.");
         "support this operation.");
   }
   }
+
+  @Override
+  public OzoneOutputStream createMultipartKey(String volumeName,
+                                              String bucketName,
+                                              String keyName,
+                                              long size,
+                                              int partNumber,
+                                              String uploadID)
+      throws IOException {
+    throw new UnsupportedOperationException("Ozone REST protocol does not " +
+        "support this operation.");
+  }
 }
 }

+ 50 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -701,4 +701,54 @@ public class RpcClient implements ClientProtocol {
     return multipartInfo;
     return multipartInfo;
   }
   }
 
 
+  @Override
+  public OzoneOutputStream createMultipartKey(String volumeName,
+                                              String bucketName,
+                                              String keyName,
+                                              long size,
+                                              int partNumber,
+                                              String uploadID)
+      throws IOException {
+    HddsClientUtils.verifyResourceName(volumeName, bucketName);
+    HddsClientUtils.checkNotNull(keyName, uploadID);
+    Preconditions.checkArgument(partNumber > 0, "Part number should be " +
+        "greater than zero");
+    Preconditions.checkArgument(size >=0, "size should be greater than or " +
+        "equal to zero");
+    String requestId = UUID.randomUUID().toString();
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setDataSize(size)
+        .setIsMultipartKey(true)
+        .setMultipartUploadID(uploadID)
+        .build();
+
+    OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
+    ChunkGroupOutputStream groupOutputStream =
+        new ChunkGroupOutputStream.Builder()
+            .setHandler(openKey)
+            .setXceiverClientManager(xceiverClientManager)
+            .setScmClient(storageContainerLocationClient)
+            .setOmClient(ozoneManagerClient)
+            .setChunkSize(chunkSize)
+            .setRequestID(requestId)
+            .setType(openKey.getKeyInfo().getType())
+            .setFactor(openKey.getKeyInfo().getFactor())
+            .setStreamBufferFlushSize(streamBufferFlushSize)
+            .setStreamBufferMaxSize(streamBufferMaxSize)
+            .setWatchTimeout(watchTimeout)
+            .setBlockSize(blockSize)
+            .setChecksum(checksum)
+            .setMultipartNumber(partNumber)
+            .setMultipartUploadID(uploadID)
+            .setIsMultipartKey(true)
+            .build();
+    groupOutputStream.addPreallocateBlocks(
+        openKey.getKeyInfo().getLatestVersionLocations(),
+        openKey.getOpenVersion());
+    return new OzoneOutputStream(groupOutputStream);
+  }
+
 }
 }

+ 2 - 1
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java

@@ -47,7 +47,8 @@ public enum OMAction implements AuditAction {
   READ_BUCKET,
   READ_BUCKET,
   READ_KEY,
   READ_KEY,
   LIST_S3BUCKETS,
   LIST_S3BUCKETS,
-  INITIATE_MULTIPART_UPLOAD;
+  INITIATE_MULTIPART_UPLOAD,
+  COMMIT_MULTIPART_UPLOAD_PARTKEY;
 
 
   @Override
   @Override
   public String getAction() {
   public String getAction() {

+ 40 - 2
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java

@@ -39,10 +39,14 @@ public final class OmKeyArgs implements Auditable {
   private final ReplicationType type;
   private final ReplicationType type;
   private final ReplicationFactor factor;
   private final ReplicationFactor factor;
   private List<OmKeyLocationInfo> locationInfoList;
   private List<OmKeyLocationInfo> locationInfoList;
+  private final boolean isMultipartKey;
+  private final String multipartUploadID;
+  private final int multipartUploadPartNumber;
 
 
   private OmKeyArgs(String volumeName, String bucketName, String keyName,
   private OmKeyArgs(String volumeName, String bucketName, String keyName,
       long dataSize, ReplicationType type, ReplicationFactor factor,
       long dataSize, ReplicationType type, ReplicationFactor factor,
-      List<OmKeyLocationInfo> locationInfoList) {
+      List<OmKeyLocationInfo> locationInfoList, boolean isMultipart,
+      String uploadID, int partNumber) {
     this.volumeName = volumeName;
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.bucketName = bucketName;
     this.keyName = keyName;
     this.keyName = keyName;
@@ -50,6 +54,21 @@ public final class OmKeyArgs implements Auditable {
     this.type = type;
     this.type = type;
     this.factor = factor;
     this.factor = factor;
     this.locationInfoList = locationInfoList;
     this.locationInfoList = locationInfoList;
+    this.isMultipartKey = isMultipart;
+    this.multipartUploadID = uploadID;
+    this.multipartUploadPartNumber = partNumber;
+  }
+
+  public boolean getIsMultipartKey() {
+    return isMultipartKey;
+  }
+
+  public String getMultipartUploadID() {
+    return multipartUploadID;
+  }
+
+  public int getMultipartUploadPartNumber() {
+    return multipartUploadPartNumber;
   }
   }
 
 
   public ReplicationType getType() {
   public ReplicationType getType() {
@@ -123,6 +142,9 @@ public final class OmKeyArgs implements Auditable {
     private ReplicationType type;
     private ReplicationType type;
     private ReplicationFactor factor;
     private ReplicationFactor factor;
     private List<OmKeyLocationInfo> locationInfoList;
     private List<OmKeyLocationInfo> locationInfoList;
+    private boolean isMultipartKey;
+    private String multipartUploadID;
+    private int multipartUploadPartNumber;
 
 
     public Builder setVolumeName(String volume) {
     public Builder setVolumeName(String volume) {
       this.volumeName = volume;
       this.volumeName = volume;
@@ -159,9 +181,25 @@ public final class OmKeyArgs implements Auditable {
       return this;
       return this;
     }
     }
 
 
+    public Builder setIsMultipartKey(boolean isMultipart) {
+      this.isMultipartKey = isMultipart;
+      return this;
+    }
+
+    public Builder setMultipartUploadID(String uploadID) {
+      this.multipartUploadID = uploadID;
+      return this;
+    }
+
+    public Builder setMultipartUploadPartNumber(int partNumber) {
+      this.multipartUploadPartNumber = partNumber;
+      return this;
+    }
+
     public OmKeyArgs build() {
     public OmKeyArgs build() {
       return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type,
       return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type,
-          factor, locationInfoList);
+          factor, locationInfoList, isMultipartKey, multipartUploadID,
+          multipartUploadPartNumber);
     }
     }
   }
   }
 }
 }

+ 6 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java

@@ -215,6 +215,7 @@ public final class OmKeyInfo {
     private long modificationTime;
     private long modificationTime;
     private HddsProtos.ReplicationType type;
     private HddsProtos.ReplicationType type;
     private HddsProtos.ReplicationFactor factor;
     private HddsProtos.ReplicationFactor factor;
+    private boolean isMultipartKey;
 
 
     public Builder setVolumeName(String volume) {
     public Builder setVolumeName(String volume) {
       this.volumeName = volume;
       this.volumeName = volume;
@@ -262,6 +263,11 @@ public final class OmKeyInfo {
       return this;
       return this;
     }
     }
 
 
+    public Builder setIsMultipartKey(boolean isMultipart) {
+      this.isMultipartKey = isMultipart;
+      return this;
+    }
+
     public OmKeyInfo build() {
     public OmKeyInfo build() {
       return new OmKeyInfo(
       return new OmKeyInfo(
           volumeName, bucketName, keyName, omKeyLocationInfoGroups,
           volumeName, bucketName, keyName, omKeyLocationInfoGroups,

+ 34 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartCommitUploadPartInfo.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+/**
+ * This class holds information about the response from commit multipart
+ * upload part request.
+ */
+public class OmMultipartCommitUploadPartInfo {
+
+  private final String partName;
+
+  public OmMultipartCommitUploadPartInfo(String name) {
+    this.partName = name;
+  }
+
+  public String getPartName() {
+    return partName;
+  }
+}

+ 12 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java

@@ -51,6 +51,18 @@ public class OmMultipartKeyInfo {
     return uploadID;
     return uploadID;
   }
   }
 
 
+  public Map<Integer, PartKeyInfo> getPartKeyInfoList() {
+    return partKeyInfoList;
+  }
+
+  public void addPartKeyInfo(int partNumber, PartKeyInfo partKeyInfo) {
+    this.partKeyInfoList.put(partNumber, partKeyInfo);
+  }
+
+  public PartKeyInfo getPartKeyInfo(int partNumber) {
+    return partKeyInfoList.get(partNumber);
+  }
+
 
 
   /**
   /**
    * Construct OmMultipartInfo from MultipartKeyInfo proto object.
    * Construct OmMultipartInfo from MultipartKeyInfo proto object.

+ 7 - 1
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  * limitations under the License.
  */
  */
 package org.apache.hadoop.ozone.om.protocol;
 package org.apache.hadoop.ozone.om.protocol;
-
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -313,5 +313,11 @@ public interface OzoneManagerProtocol {
    * @throws IOException
    * @throws IOException
    */
    */
   OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException;
   OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException;
+
+
+
+  OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
+      OmKeyArgs omKeyArgs, long clientID) throws IOException;
+
 }
 }
 
 

+ 58 - 2
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
@@ -71,6 +72,10 @@ import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.LocateKeyRequest;
     .OzoneManagerProtocolProtos.LocateKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto
 import org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.LocateKeyResponse;
     .OzoneManagerProtocolProtos.LocateKeyResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartCommitUploadPartRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartCommitUploadPartResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartInfoInitiateRequest;
     .MultipartInfoInitiateRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -556,12 +561,27 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
     KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
     KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
         .setVolumeName(args.getVolumeName())
         .setVolumeName(args.getVolumeName())
         .setBucketName(args.getBucketName())
         .setBucketName(args.getBucketName())
-        .setFactor(args.getFactor())
-        .setType(args.getType())
         .setKeyName(args.getKeyName());
         .setKeyName(args.getKeyName());
+
+    if (args.getFactor() != null) {
+      keyArgs.setFactor(args.getFactor());
+    }
+
+    if (args.getType() != null) {
+      keyArgs.setType(args.getType());
+    }
+
     if (args.getDataSize() > 0) {
     if (args.getDataSize() > 0) {
       keyArgs.setDataSize(args.getDataSize());
       keyArgs.setDataSize(args.getDataSize());
     }
     }
+
+    if (args.getMultipartUploadID() != null) {
+      keyArgs.setMultipartUploadID(args.getMultipartUploadID());
+    }
+
+    keyArgs.setIsMultipartKey(args.getIsMultipartKey());
+
+
     req.setKeyArgs(keyArgs.build());
     req.setKeyArgs(keyArgs.build());
 
 
     final LocateKeyResponse resp;
     final LocateKeyResponse resp;
@@ -919,4 +939,40 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
     return new OmMultipartInfo(resp.getVolumeName(), resp.getBucketName(), resp
     return new OmMultipartInfo(resp.getVolumeName(), resp.getBucketName(), resp
         .getKeyName(), resp.getMultipartUploadID());
         .getKeyName(), resp.getMultipartUploadID());
   }
   }
+
+  @Override
+  public OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
+      OmKeyArgs omKeyArgs, long clientID) throws IOException {
+    MultipartCommitUploadPartRequest.Builder multipartCommitUploadPartRequest
+        = MultipartCommitUploadPartRequest.newBuilder();
+
+    KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
+        .setVolumeName(omKeyArgs.getVolumeName())
+        .setBucketName(omKeyArgs.getBucketName())
+        .setKeyName(omKeyArgs.getKeyName())
+        .setMultipartUploadID(omKeyArgs.getMultipartUploadID())
+        .setIsMultipartKey(omKeyArgs.getIsMultipartKey())
+        .setMultipartNumber(omKeyArgs.getMultipartUploadPartNumber());
+    multipartCommitUploadPartRequest.setClientID(clientID);
+    multipartCommitUploadPartRequest.setKeyArgs(keyArgs.build());
+
+    MultipartCommitUploadPartResponse response;
+
+    try {
+      response = rpcProxy.commitMultipartUploadPart(NULL_RPC_CONTROLLER,
+          multipartCommitUploadPartRequest.build());
+
+    } catch (ServiceException ex) {
+      throw ProtobufHelper.getRemoteException(ex);
+    }
+
+    if (response.getStatus() != Status.OK) {
+      throw new IOException("Commit multipart upload part key failed, error:"
+          + response.getStatus());
+    }
+
+    OmMultipartCommitUploadPartInfo info = new
+        OmMultipartCommitUploadPartInfo(response.getPartName());
+    return info;
+  }
 }
 }

+ 19 - 0
hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto

@@ -61,6 +61,8 @@ enum Status {
     S3_BUCKET_NOT_FOUND = 22;
     S3_BUCKET_NOT_FOUND = 22;
     S3_BUCKET_ALREADY_EXISTS = 23;
     S3_BUCKET_ALREADY_EXISTS = 23;
     INITIATE_MULTIPART_UPLOAD_ERROR = 24;
     INITIATE_MULTIPART_UPLOAD_ERROR = 24;
+    MULTIPART_UPLOAD_PARTFILE_ERROR = 25;
+    NO_SUCH_MULTIPART_UPLOAD_ERROR = 26;
 }
 }
 
 
 
 
@@ -244,6 +246,9 @@ message KeyArgs {
     optional hadoop.hdds.ReplicationType type = 5;
     optional hadoop.hdds.ReplicationType type = 5;
     optional hadoop.hdds.ReplicationFactor factor = 6;
     optional hadoop.hdds.ReplicationFactor factor = 6;
     repeated KeyLocation keyLocations = 7;
     repeated KeyLocation keyLocations = 7;
+    optional bool isMultipartKey = 8;
+    optional string multipartUploadID = 9;
+    optional uint32 multipartNumber = 10;
 }
 }
 
 
 message KeyLocation {
 message KeyLocation {
@@ -430,6 +435,17 @@ message PartKeyInfo {
     required KeyInfo partKeyInfo = 3;
     required KeyInfo partKeyInfo = 3;
 }
 }
 
 
+message MultipartCommitUploadPartRequest {
+    required KeyArgs keyArgs = 1;
+    required uint64 clientID = 2;
+}
+
+message MultipartCommitUploadPartResponse {
+    // This one is returned as Etag for S3.
+    optional string partName = 1;
+    required Status status = 2;
+}
+
 
 
 
 
 /**
 /**
@@ -570,4 +586,7 @@ service OzoneManagerService {
 
 
     rpc initiateMultiPartUpload(MultipartInfoInitiateRequest)
     rpc initiateMultiPartUpload(MultipartInfoInitiateRequest)
     returns (MultipartInfoInitiateResponse);
     returns (MultipartInfoInitiateResponse);
+
+    rpc commitMultipartUploadPart(MultipartCommitUploadPartRequest)
+    returns (MultipartCommitUploadPartResponse);
 }
 }

+ 180 - 8
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java

@@ -31,12 +31,14 @@ import org.apache.hadoop.hdds.scm.XceiverClientRatis;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 
 
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ozone.*;
 import org.apache.hadoop.ozone.*;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.client.*;
 import org.apache.hadoop.ozone.client.*;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
 import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@@ -48,15 +50,12 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers
 import org.apache.hadoop.ozone.container.keyvalue.helpers
     .KeyValueContainerLocationUtil;
     .KeyValueContainerLocationUtil;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
-import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.*;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.client.rest.OzoneException;
 import org.apache.hadoop.ozone.client.rest.OzoneException;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.protocolPB.
 import org.apache.hadoop.hdds.scm.protocolPB.
     StorageContainerLocationProtocolClientSideTranslatorPB;
     StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
 import org.junit.AfterClass;
 import org.junit.AfterClass;
@@ -79,6 +78,7 @@ import static org.hamcrest.CoreMatchers.either;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 
 /**
 /**
  * This class is to test all the public facing APIs of Ozone Client.
  * This class is to test all the public facing APIs of Ozone Client.
@@ -732,7 +732,7 @@ public class TestOzoneRpcClient {
     try {
     try {
       // try to read
       // try to read
       readKey(bucket, keyName, value);
       readKey(bucket, keyName, value);
-      Assert.fail("Expected exception not thrown");
+      fail("Expected exception not thrown");
     } catch (IOException e) {
     } catch (IOException e) {
       Assert.assertTrue(e.getMessage().contains("Failed to execute command"));
       Assert.assertTrue(e.getMessage().contains("Failed to execute command"));
       Assert.assertTrue(
       Assert.assertTrue(
@@ -914,7 +914,7 @@ public class TestOzoneRpcClient {
     try {
     try {
       OzoneInputStream is = bucket.readKey(keyName);
       OzoneInputStream is = bucket.readKey(keyName);
       is.read(new byte[100]);
       is.read(new byte[100]);
-      Assert.fail("Reading corrupted data should fail.");
+      fail("Reading corrupted data should fail.");
     } catch (OzoneChecksumException e) {
     } catch (OzoneChecksumException e) {
       GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
       GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
     }
     }
@@ -1116,7 +1116,7 @@ public class TestOzoneRpcClient {
     OzoneVolume vol = store.getVolume(volume);
     OzoneVolume vol = store.getVolume(volume);
     Iterator<? extends OzoneBucket> buckets = vol.listBuckets("");
     Iterator<? extends OzoneBucket> buckets = vol.listBuckets("");
     while(buckets.hasNext()) {
     while(buckets.hasNext()) {
-      Assert.fail();
+      fail();
     }
     }
   }
   }
 
 
@@ -1258,7 +1258,7 @@ public class TestOzoneRpcClient {
     OzoneBucket buc = vol.getBucket(bucket);
     OzoneBucket buc = vol.getBucket(bucket);
     Iterator<? extends OzoneKey> keys = buc.listKeys("");
     Iterator<? extends OzoneKey> keys = buc.listKeys("");
     while(keys.hasNext()) {
     while(keys.hasNext()) {
-      Assert.fail();
+      fail();
     }
     }
   }
   }
 
 
@@ -1296,6 +1296,7 @@ public class TestOzoneRpcClient {
     assertNotNull(multipartInfo.getUploadID());
     assertNotNull(multipartInfo.getUploadID());
   }
   }
 
 
+
   @Test
   @Test
   public void testInitiateMultipartUploadWithDefaultReplication() throws
   public void testInitiateMultipartUploadWithDefaultReplication() throws
       IOException {
       IOException {
@@ -1329,6 +1330,177 @@ public class TestOzoneRpcClient {
   }
   }
 
 
 
 
+  @Test
+  public void testUploadPartWithNoOverride() throws IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+    String sampleData = "sample Value";
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
+        ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+
+    assertNotNull(multipartInfo);
+    String uploadID = multipartInfo.getUploadID();
+    Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
+    Assert.assertEquals(bucketName, multipartInfo.getBucketName());
+    Assert.assertEquals(keyName, multipartInfo.getKeyName());
+    assertNotNull(multipartInfo.getUploadID());
+
+    OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
+        sampleData.length(), 1, uploadID);
+    ozoneOutputStream.write(DFSUtil.string2Bytes(sampleData), 0,
+        sampleData.length());
+    ozoneOutputStream.close();
+
+    OmMultipartCommitUploadPartInfo commitUploadPartInfo = ozoneOutputStream
+        .getCommitUploadPartInfo();
+
+    assertNotNull(commitUploadPartInfo);
+    String partName = commitUploadPartInfo.getPartName();
+    assertNotNull(commitUploadPartInfo.getPartName());
+
+  }
+
+  @Test
+  public void testUploadPartOverrideWithStandAlone() throws IOException {
+
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+    String sampleData = "sample Value";
+    int partNumber = 1;
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
+        ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+
+    assertNotNull(multipartInfo);
+    String uploadID = multipartInfo.getUploadID();
+    Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
+    Assert.assertEquals(bucketName, multipartInfo.getBucketName());
+    Assert.assertEquals(keyName, multipartInfo.getKeyName());
+    assertNotNull(multipartInfo.getUploadID());
+
+    OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
+        sampleData.length(), partNumber, uploadID);
+    ozoneOutputStream.write(DFSUtil.string2Bytes(sampleData), 0,
+        sampleData.length());
+    ozoneOutputStream.close();
+
+    OmMultipartCommitUploadPartInfo commitUploadPartInfo = ozoneOutputStream
+        .getCommitUploadPartInfo();
+
+    assertNotNull(commitUploadPartInfo);
+    String partName = commitUploadPartInfo.getPartName();
+    assertNotNull(commitUploadPartInfo.getPartName());
+
+    //Overwrite the part by creating part key with same part number.
+    sampleData = "sample Data Changed";
+    ozoneOutputStream = bucket.createMultipartKey(keyName,
+        sampleData.length(), partNumber, uploadID);
+    ozoneOutputStream.write(DFSUtil.string2Bytes(sampleData), 0, "name"
+        .length());
+    ozoneOutputStream.close();
+
+    commitUploadPartInfo = ozoneOutputStream
+        .getCommitUploadPartInfo();
+
+    assertNotNull(commitUploadPartInfo);
+    assertNotNull(commitUploadPartInfo.getPartName());
+
+    // PartName should be different from old part Name.
+    assertNotEquals("Part names should be different", partName,
+        commitUploadPartInfo.getPartName());
+  }
+
+  @Test
+  public void testUploadPartOverrideWithRatis() throws IOException {
+
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+    String sampleData = "sample Value";
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    OmMultipartInfo multipartInfo = bucket.initiateMultipartUpload(keyName,
+        ReplicationType.RATIS, ReplicationFactor.THREE);
+
+    assertNotNull(multipartInfo);
+    String uploadID = multipartInfo.getUploadID();
+    Assert.assertEquals(volumeName, multipartInfo.getVolumeName());
+    Assert.assertEquals(bucketName, multipartInfo.getBucketName());
+    Assert.assertEquals(keyName, multipartInfo.getKeyName());
+    assertNotNull(multipartInfo.getUploadID());
+
+    int partNumber = 1;
+
+    OzoneOutputStream ozoneOutputStream = bucket.createMultipartKey(keyName,
+        sampleData.length(), partNumber, uploadID);
+    ozoneOutputStream.write(DFSUtil.string2Bytes(sampleData), 0,
+        sampleData.length());
+    ozoneOutputStream.close();
+
+    OmMultipartCommitUploadPartInfo commitUploadPartInfo = ozoneOutputStream
+        .getCommitUploadPartInfo();
+
+    assertNotNull(commitUploadPartInfo);
+    String partName = commitUploadPartInfo.getPartName();
+    assertNotNull(commitUploadPartInfo.getPartName());
+
+    //Overwrite the part by creating part key with same part number.
+    sampleData = "sample Data Changed";
+    ozoneOutputStream = bucket.createMultipartKey(keyName,
+        sampleData.length(), partNumber, uploadID);
+    ozoneOutputStream.write(DFSUtil.string2Bytes(sampleData), 0, "name"
+        .length());
+    ozoneOutputStream.close();
+
+    commitUploadPartInfo = ozoneOutputStream
+        .getCommitUploadPartInfo();
+
+    assertNotNull(commitUploadPartInfo);
+    assertNotNull(commitUploadPartInfo.getPartName());
+
+    // PartName should be different from old part Name.
+    assertNotEquals("Part names should be different", partName,
+        commitUploadPartInfo.getPartName());
+  }
+
+  @Test
+  public void testNoSuchUploadError() throws IOException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String keyName = UUID.randomUUID().toString();
+    String sampleData = "sample Value";
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String uploadID = "random";
+    try {
+      bucket.createMultipartKey(keyName, sampleData.length(), 1, uploadID);
+      fail("testNoSuchUploadError failed");
+    } catch (IOException ex) {
+      GenericTestUtils.assertExceptionContains("NO_SUCH_MULTIPART_UPLOAD_ERROR",
+          ex);
+    }
+
+  }
+
+
   /**
   /**
    * Close OzoneClient and shutdown MiniOzoneCluster.
    * Close OzoneClient and shutdown MiniOzoneCluster.
    */
    */

+ 5 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java

@@ -21,6 +21,7 @@ import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.utils.BackgroundService;
 import org.apache.hadoop.utils.BackgroundService;
@@ -190,4 +191,8 @@ public interface KeyManager {
    */
    */
   OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException;
   OmMultipartInfo initiateMultipartUpload(OmKeyArgs keyArgs) throws IOException;
 
 
+
+  OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
+      OmKeyArgs keyArgs, long clientID) throws IOException;
+
 }
 }

+ 152 - 33
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
@@ -205,17 +206,35 @@ public class KeyManagerImpl implements KeyManager {
     ReplicationType type = args.getType();
     ReplicationType type = args.getType();
     long currentTime = Time.monotonicNowNanos();
     long currentTime = Time.monotonicNowNanos();
 
 
-    // If user does not specify a replication strategy or
-    // replication factor, OM will use defaults.
-    if (factor == null) {
-      factor = useRatis ? ReplicationFactor.THREE : ReplicationFactor.ONE;
-    }
-
-    if (type == null) {
-      type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE;
-    }
-
     try {
     try {
+      if (args.getIsMultipartKey()) {
+        // When key is multipart upload part key, we should take replication
+        // type and replication factor from original key which has done
+        // initiate multipart upload. If we have not found any such, we throw
+        // error no such multipart upload.
+        String uploadID = args.getMultipartUploadID();
+        Preconditions.checkNotNull(uploadID);
+        String multipartKey = metadataManager.getMultipartKey(volumeName,
+            bucketName, keyName, uploadID);
+        OmKeyInfo partKeyInfo = metadataManager.getOpenKeyTable().get(
+            multipartKey);
+        if (partKeyInfo == null) {
+          throw new OMException("No such Multipart upload is with specified " +
+              "uploadId " + uploadID, ResultCodes.NO_SUCH_MULTIPART_UPLOAD);
+        } else {
+          factor = partKeyInfo.getFactor();
+          type = partKeyInfo.getType();
+        }
+      } else {
+        // If user does not specify a replication strategy or
+        // replication factor, OM will use defaults.
+        if (factor == null) {
+          factor = useRatis ? ReplicationFactor.THREE : ReplicationFactor.ONE;
+        }
+        if (type == null) {
+          type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE;
+        }
+      }
       long requestedSize = Math.min(preallocateMax, args.getDataSize());
       long requestedSize = Math.min(preallocateMax, args.getDataSize());
       List<OmKeyLocationInfo> locations = new ArrayList<>();
       List<OmKeyLocationInfo> locations = new ArrayList<>();
       String objectKey = metadataManager.getOzoneKey(
       String objectKey = metadataManager.getOzoneKey(
@@ -254,31 +273,28 @@ public class KeyManagerImpl implements KeyManager {
       // value, then this value is used, otherwise, we allocate a single block
       // value, then this value is used, otherwise, we allocate a single block
       // which is the current size, if read by the client.
       // which is the current size, if read by the client.
       long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
       long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
-      OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey);
+      OmKeyInfo keyInfo;
       long openVersion;
       long openVersion;
-      if (keyInfo != null) {
-        // the key already exist, the new blocks will be added as new version
-        // when locations.size = 0, the new version will have identical blocks
-        // as its previous version
-        openVersion = keyInfo.addNewVersion(locations);
-        keyInfo.setDataSize(size + keyInfo.getDataSize());
-      } else {
-        // the key does not exist, create a new object, the new blocks are the
-        // version 0
-
-        keyInfo = new OmKeyInfo.Builder()
-            .setVolumeName(args.getVolumeName())
-            .setBucketName(args.getBucketName())
-            .setKeyName(args.getKeyName())
-            .setOmKeyLocationInfos(Collections.singletonList(
-                new OmKeyLocationInfoGroup(0, locations)))
-            .setCreationTime(Time.now())
-            .setModificationTime(Time.now())
-            .setDataSize(size)
-            .setReplicationType(type)
-            .setReplicationFactor(factor)
-            .build();
+
+      if (args.getIsMultipartKey()) {
+        // For this upload part we don't need to check in KeyTable. As this
+        // is not an actual key, it is a part of the key.
+        keyInfo = createKeyInfo(args, locations, factor, type, size);
         openVersion = 0;
         openVersion = 0;
+      } else {
+        keyInfo = metadataManager.getKeyTable().get(objectKey);
+        if (keyInfo != null) {
+          // the key already exist, the new blocks will be added as new version
+          // when locations.size = 0, the new version will have identical blocks
+          // as its previous version
+          openVersion = keyInfo.addNewVersion(locations);
+          keyInfo.setDataSize(size + keyInfo.getDataSize());
+        } else {
+          // the key does not exist, create a new object, the new blocks are the
+          // version 0
+          keyInfo = createKeyInfo(args, locations, factor, type, size);
+          openVersion = 0;
+        }
       }
       }
       String openKey = metadataManager.getOpenKey(
       String openKey = metadataManager.getOpenKey(
           volumeName, bucketName, keyName, currentTime);
           volumeName, bucketName, keyName, currentTime);
@@ -311,6 +327,33 @@ public class KeyManagerImpl implements KeyManager {
     }
     }
   }
   }
 
 
+  /**
+   * Create OmKeyInfo object.
+   * @param keyArgs
+   * @param locations
+   * @param factor
+   * @param type
+   * @param size
+   * @return
+   */
+  private OmKeyInfo createKeyInfo(OmKeyArgs keyArgs,
+                                  List<OmKeyLocationInfo> locations,
+                                  ReplicationFactor factor,
+                                  ReplicationType type, long size) {
+    return new OmKeyInfo.Builder()
+        .setVolumeName(keyArgs.getVolumeName())
+        .setBucketName(keyArgs.getBucketName())
+        .setKeyName(keyArgs.getKeyName())
+        .setOmKeyLocationInfos(Collections.singletonList(
+            new OmKeyLocationInfoGroup(0, locations)))
+        .setCreationTime(Time.now())
+        .setModificationTime(Time.now())
+        .setDataSize(size)
+        .setReplicationType(type)
+        .setReplicationFactor(factor)
+        .build();
+  }
+
   @Override
   @Override
   public void commitKey(OmKeyArgs args, long clientID) throws IOException {
   public void commitKey(OmKeyArgs args, long clientID) throws IOException {
     Preconditions.checkNotNull(args);
     Preconditions.checkNotNull(args);
@@ -610,4 +653,80 @@ public class KeyManagerImpl implements KeyManager {
       metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
       metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
     }
     }
   }
   }
+
+  @Override
+  public OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
+      OmKeyArgs omKeyArgs, long clientID) throws IOException {
+    Preconditions.checkNotNull(omKeyArgs);
+    String volumeName = omKeyArgs.getVolumeName();
+    String bucketName = omKeyArgs.getBucketName();
+    String keyName = omKeyArgs.getKeyName();
+    String uploadID = omKeyArgs.getMultipartUploadID();
+    int partNumber = omKeyArgs.getMultipartUploadPartNumber();
+
+    metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
+    String partName;
+    try {
+      String multipartKey = metadataManager.getMultipartKey(volumeName,
+          bucketName, keyName, uploadID);
+      OmMultipartKeyInfo multipartKeyInfo = metadataManager
+          .getMultipartInfoTable().get(multipartKey);
+
+      String openKey = metadataManager.getOpenKey(
+          volumeName, bucketName, keyName, clientID);
+      OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(
+          openKey);
+
+      partName = keyName + clientID;
+      if (multipartKeyInfo == null) {
+        throw new OMException("No such Multipart upload is with specified " +
+            "uploadId " + uploadID, ResultCodes.NO_SUCH_MULTIPART_UPLOAD);
+      } else {
+        PartKeyInfo oldPartKeyInfo =
+            multipartKeyInfo.getPartKeyInfo(partNumber);
+        PartKeyInfo.Builder partKeyInfo = PartKeyInfo.newBuilder();
+        partKeyInfo.setPartName(partName);
+        partKeyInfo.setPartNumber(partNumber);
+        partKeyInfo.setPartKeyInfo(keyInfo.getProtobuf());
+        multipartKeyInfo.addPartKeyInfo(partNumber, partKeyInfo.build());
+        if (oldPartKeyInfo == null) {
+          // This is the first time part is being added.
+          DBStore store = metadataManager.getStore();
+          try (BatchOperation batch = store.initBatchOperation()) {
+            metadataManager.getOpenKeyTable().deleteWithBatch(batch, openKey);
+            metadataManager.getMultipartInfoTable().putWithBatch(batch,
+                multipartKey, multipartKeyInfo);
+            store.commitBatchOperation(batch);
+          }
+        } else {
+          // If we have this part already, that means we are overriding it.
+          // We need to 3 steps.
+          // Add the old entry to delete table.
+          // Remove the new entry from openKey table.
+          // Add the new entry in to the list of part keys.
+          DBStore store = metadataManager.getStore();
+          try (BatchOperation batch = store.initBatchOperation()) {
+            metadataManager.getDeletedTable().putWithBatch(batch,
+                oldPartKeyInfo.getPartName(),
+                OmKeyInfo.getFromProtobuf(oldPartKeyInfo.getPartKeyInfo()));
+            metadataManager.getOpenKeyTable().deleteWithBatch(batch, openKey);
+            metadataManager.getMultipartInfoTable().putWithBatch(batch,
+                multipartKey, multipartKeyInfo);
+            store.commitBatchOperation(batch);
+          }
+        }
+      }
+    } catch (IOException ex) {
+      LOG.error("Upload part Failed: volume:{} bucket:{} " +
+          "key:{} PartNumber: {}", volumeName, bucketName, keyName,
+          partNumber, ex);
+      throw new OMException(ex.getMessage(), ResultCodes.UPLOAD_PART_FAILED);
+    } finally {
+      metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
+    }
+
+    return new OmMultipartCommitUploadPartInfo(partName);
+
+  }
+
 }
 }

+ 11 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java

@@ -86,6 +86,8 @@ public class OMMetrics {
   private @Metric MutableCounterLong numGetServiceListFails;
   private @Metric MutableCounterLong numGetServiceListFails;
   private @Metric MutableCounterLong numListS3BucketsFails;
   private @Metric MutableCounterLong numListS3BucketsFails;
   private @Metric MutableCounterLong numInitiateMultipartUploadFails;
   private @Metric MutableCounterLong numInitiateMultipartUploadFails;
+  private @Metric MutableCounterLong numCommitMultipartUploadParts;
+  private @Metric MutableCounterLong getNumCommitMultipartUploadPartFails;
 
 
   // Metrics for total number of volumes, buckets and keys
   // Metrics for total number of volumes, buckets and keys
 
 
@@ -236,6 +238,15 @@ public class OMMetrics {
     numInitiateMultipartUploadFails.incr();
     numInitiateMultipartUploadFails.incr();
   }
   }
 
 
+  public void incNumCommitMultipartUploadParts() {
+    numKeyOps.incr();
+    numCommitMultipartUploadParts.incr();
+  }
+
+  public void incNumCommitMultipartUploadPartFails() {
+    numInitiateMultipartUploadFails.incr();
+  }
+
   public void incNumGetServiceLists() {
   public void incNumGetServiceLists() {
     numGetServiceLists.incr();
     numGetServiceLists.incr();
   }
   }

+ 27 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -61,6 +61,7 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
@@ -1574,6 +1575,32 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     return multipartInfo;
     return multipartInfo;
   }
   }
 
 
+  @Override
+  public OmMultipartCommitUploadPartInfo commitMultipartUploadPart(
+      OmKeyArgs keyArgs, long clientID) throws IOException {
+    boolean auditSuccess = false;
+    OmMultipartCommitUploadPartInfo commitUploadPartInfo;
+    metrics.incNumCommitMultipartUploadParts();
+    try {
+      commitUploadPartInfo = keyManager.commitMultipartUploadPart(keyArgs,
+          clientID);
+      auditSuccess = true;
+    } catch (IOException ex) {
+      AUDIT.logWriteFailure(buildAuditMessageForFailure(OMAction
+              .INITIATE_MULTIPART_UPLOAD, (keyArgs == null) ? null : keyArgs
+          .toAuditMap(), ex));
+      metrics.incNumCommitMultipartUploadPartFails();
+      throw ex;
+    } finally {
+      if(auditSuccess) {
+        AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
+            OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, (keyArgs == null) ? null :
+                keyArgs.toAuditMap()));
+      }
+    }
+    return commitUploadPartInfo;
+  }
+
 
 
 
 
 
 

+ 3 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java

@@ -116,6 +116,8 @@ public class OMException extends IOException {
     SCM_IN_CHILL_MODE,
     SCM_IN_CHILL_MODE,
     S3_BUCKET_ALREADY_EXISTS,
     S3_BUCKET_ALREADY_EXISTS,
     S3_BUCKET_NOT_FOUND,
     S3_BUCKET_NOT_FOUND,
-    INITIATE_MULTIPART_UPLOAD_FAILED;
+    INITIATE_MULTIPART_UPLOAD_FAILED,
+    NO_SUCH_MULTIPART_UPLOAD,
+    UPLOAD_PART_FAILED;
   }
   }
 }
 }

+ 39 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
@@ -86,6 +87,10 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .LocateKeyRequest;
     .LocateKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .LocateKeyResponse;
     .LocateKeyResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartCommitUploadPartRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .MultipartCommitUploadPartResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartInfoInitiateRequest;
     .MultipartInfoInitiateRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -197,6 +202,11 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
         return Status.S3_BUCKET_NOT_FOUND;
         return Status.S3_BUCKET_NOT_FOUND;
       case INITIATE_MULTIPART_UPLOAD_FAILED:
       case INITIATE_MULTIPART_UPLOAD_FAILED:
         return Status.INITIATE_MULTIPART_UPLOAD_ERROR;
         return Status.INITIATE_MULTIPART_UPLOAD_ERROR;
+      case NO_SUCH_MULTIPART_UPLOAD:
+        return Status.NO_SUCH_MULTIPART_UPLOAD_ERROR;
+      case UPLOAD_PART_FAILED:
+        return Status.MULTIPART_UPLOAD_PARTFILE_ERROR;
+
       default:
       default:
         return Status.INTERNAL_ERROR;
         return Status.INTERNAL_ERROR;
       }
       }
@@ -378,6 +388,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
           .setDataSize(keyArgs.getDataSize())
           .setDataSize(keyArgs.getDataSize())
           .setType(type)
           .setType(type)
           .setFactor(factor)
           .setFactor(factor)
+          .setIsMultipartKey(keyArgs.getIsMultipartKey())
+          .setMultipartUploadID(keyArgs.getMultipartUploadID())
+          .setMultipartUploadPartNumber(keyArgs.getMultipartNumber())
           .build();
           .build();
       if (keyArgs.hasDataSize()) {
       if (keyArgs.hasDataSize()) {
         omKeyArgs.setDataSize(keyArgs.getDataSize());
         omKeyArgs.setDataSize(keyArgs.getDataSize());
@@ -684,4 +697,30 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
     }
     }
     return resp.build();
     return resp.build();
   }
   }
+
+  @Override
+  public MultipartCommitUploadPartResponse commitMultipartUploadPart(
+      RpcController controller, MultipartCommitUploadPartRequest request) {
+    MultipartCommitUploadPartResponse.Builder resp =
+        MultipartCommitUploadPartResponse.newBuilder();
+    try {
+      KeyArgs keyArgs = request.getKeyArgs();
+      OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
+          .setVolumeName(keyArgs.getVolumeName())
+          .setBucketName(keyArgs.getBucketName())
+          .setKeyName(keyArgs.getKeyName())
+          .setMultipartUploadID(keyArgs.getMultipartUploadID())
+          .setIsMultipartKey(keyArgs.getIsMultipartKey())
+          .setMultipartUploadPartNumber(keyArgs.getMultipartNumber())
+          .build();
+      OmMultipartCommitUploadPartInfo commitUploadPartInfo =
+          impl.commitMultipartUploadPart(omKeyArgs, request.getClientID());
+      resp.setPartName(commitUploadPartInfo.getPartName());
+      resp.setStatus(Status.OK);
+    } catch (IOException ex) {
+      resp.setStatus(exceptionToResponseStatus(ex));
+    }
+    return resp.build();
+  }
+
 }
 }