Browse Source

HDDS-1121. Key read failure when data is written parallel in to Ozone.
Contributed by Bharat Viswanadham.

Anu Engineer 6 years ago
parent
commit
02d04bd107

+ 10 - 4
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java

@@ -29,6 +29,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
@@ -76,7 +77,8 @@ public class BlockOutputStream extends OutputStream {
   private final BlockData.Builder containerBlockData;
   private final BlockData.Builder containerBlockData;
   private XceiverClientManager xceiverClientManager;
   private XceiverClientManager xceiverClientManager;
   private XceiverClientSpi xceiverClient;
   private XceiverClientSpi xceiverClient;
-  private final Checksum checksum;
+  private final ContainerProtos.ChecksumType checksumType;
+  private final int bytesPerChecksum;
   private final String streamId;
   private final String streamId;
   private int chunkIndex;
   private int chunkIndex;
   private int chunkSize;
   private int chunkSize;
@@ -121,14 +123,16 @@ public class BlockOutputStream extends OutputStream {
    * @param streamBufferFlushSize flush size
    * @param streamBufferFlushSize flush size
    * @param streamBufferMaxSize   max size of the currentBuffer
    * @param streamBufferMaxSize   max size of the currentBuffer
    * @param watchTimeout          watch timeout
    * @param watchTimeout          watch timeout
-   * @param checksum              checksum
+   * @param checksumType          checksum type
+   * @param bytesPerChecksum      Bytes per checksum
    */
    */
   @SuppressWarnings("parameternumber")
   @SuppressWarnings("parameternumber")
   public BlockOutputStream(BlockID blockID, String key,
   public BlockOutputStream(BlockID blockID, String key,
       XceiverClientManager xceiverClientManager, Pipeline pipeline,
       XceiverClientManager xceiverClientManager, Pipeline pipeline,
       String traceID, int chunkSize, long streamBufferFlushSize,
       String traceID, int chunkSize, long streamBufferFlushSize,
       long streamBufferMaxSize, long watchTimeout, List<ByteBuffer> bufferList,
       long streamBufferMaxSize, long watchTimeout, List<ByteBuffer> bufferList,
-      Checksum checksum) throws IOException {
+      ChecksumType checksumType, int bytesPerChecksum)
+      throws IOException {
     this.blockID = blockID;
     this.blockID = blockID;
     this.key = key;
     this.key = key;
     this.traceID = traceID;
     this.traceID = traceID;
@@ -146,7 +150,8 @@ public class BlockOutputStream extends OutputStream {
     this.streamBufferMaxSize = streamBufferMaxSize;
     this.streamBufferMaxSize = streamBufferMaxSize;
     this.watchTimeout = watchTimeout;
     this.watchTimeout = watchTimeout;
     this.bufferList = bufferList;
     this.bufferList = bufferList;
-    this.checksum = checksum;
+    this.checksumType = checksumType;
+    this.bytesPerChecksum = bytesPerChecksum;
 
 
     // A single thread executor handle the responses of async requests
     // A single thread executor handle the responses of async requests
     responseExecutor = Executors.newSingleThreadExecutor();
     responseExecutor = Executors.newSingleThreadExecutor();
@@ -585,6 +590,7 @@ public class BlockOutputStream extends OutputStream {
   private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
   private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
     int effectiveChunkSize = chunk.remaining();
     int effectiveChunkSize = chunk.remaining();
     ByteString data = ByteString.copyFrom(chunk);
     ByteString data = ByteString.copyFrom(chunk);
+    Checksum checksum = new Checksum(checksumType, bytesPerChecksum);
     ChecksumData checksumData = checksum.computeChecksum(data);
     ChecksumData checksumData = checksum.computeChecksum(data);
     ChunkInfo chunkInfo = ChunkInfo.newBuilder()
     ChunkInfo chunkInfo = ChunkInfo.newBuilder()
         .setChunkName(DigestUtils.md5Hex(key) + "_stream_" + streamId +
         .setChunkName(DigestUtils.md5Hex(key) + "_stream_" + streamId +

+ 2 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java

@@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory;
 
 
 /**
 /**
  * Class to compute and verify checksums for chunks.
  * Class to compute and verify checksums for chunks.
+ *
+ * This class is not thread safe.
  */
  */
 public class Checksum {
 public class Checksum {
 
 

+ 20 - 13
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java

@@ -23,11 +23,12 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.List;
 
 
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ChecksumType;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 
 
@@ -41,7 +42,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
   private final String key;
   private final String key;
   private final XceiverClientManager xceiverClientManager;
   private final XceiverClientManager xceiverClientManager;
   private final Pipeline pipeline;
   private final Pipeline pipeline;
-  private final Checksum checksum;
+  private final ChecksumType checksumType;
+  private final int bytesPerChecksum;
   private final String requestId;
   private final String requestId;
   private final int chunkSize;
   private final int chunkSize;
   // total number of bytes that should be written to this stream
   // total number of bytes that should be written to this stream
@@ -60,7 +62,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
       XceiverClientManager xceiverClientManager,
       XceiverClientManager xceiverClientManager,
       Pipeline pipeline, String requestId, int chunkSize,
       Pipeline pipeline, String requestId, int chunkSize,
       long length, long streamBufferFlushSize, long streamBufferMaxSize,
       long length, long streamBufferFlushSize, long streamBufferMaxSize,
-      long watchTimeout, List<ByteBuffer> bufferList, Checksum checksum,
+      long watchTimeout, List<ByteBuffer> bufferList,
+      ChecksumType checksumType, int bytesPerChecksum,
       Token<OzoneBlockTokenIdentifier> token) {
       Token<OzoneBlockTokenIdentifier> token) {
     this.outputStream = null;
     this.outputStream = null;
     this.blockID = blockID;
     this.blockID = blockID;
@@ -76,7 +79,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
     this.streamBufferMaxSize = streamBufferMaxSize;
     this.streamBufferMaxSize = streamBufferMaxSize;
     this.watchTimeout = watchTimeout;
     this.watchTimeout = watchTimeout;
     this.bufferList = bufferList;
     this.bufferList = bufferList;
-    this.checksum = checksum;
+    this.checksumType = checksumType;
+    this.bytesPerChecksum = bytesPerChecksum;
   }
   }
 
 
   long getLength() {
   long getLength() {
@@ -105,7 +109,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
       this.outputStream =
       this.outputStream =
           new BlockOutputStream(blockID, key, xceiverClientManager,
           new BlockOutputStream(blockID, key, xceiverClientManager,
               pipeline, requestId, chunkSize, streamBufferFlushSize,
               pipeline, requestId, chunkSize, streamBufferFlushSize,
-              streamBufferMaxSize, watchTimeout, bufferList, checksum);
+              streamBufferMaxSize, watchTimeout, bufferList, checksumType,
+              bytesPerChecksum);
     }
     }
   }
   }
 
 
@@ -198,10 +203,16 @@ public final class BlockOutputStreamEntry extends OutputStream {
     private long watchTimeout;
     private long watchTimeout;
     private List<ByteBuffer> bufferList;
     private List<ByteBuffer> bufferList;
     private Token<OzoneBlockTokenIdentifier> token;
     private Token<OzoneBlockTokenIdentifier> token;
-    private Checksum checksum;
+    private ChecksumType checksumType;
+    private int bytesPerChecksum;
 
 
-    public Builder setChecksum(Checksum cs) {
-      this.checksum = cs;
+    public Builder setChecksumType(ChecksumType type) {
+      this.checksumType = type;
+      return this;
+    }
+
+    public Builder setBytesPerChecksum(int bytes) {
+      this.bytesPerChecksum = bytes;
       return this;
       return this;
     }
     }
 
 
@@ -270,7 +281,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
       return new BlockOutputStreamEntry(blockID, key,
       return new BlockOutputStreamEntry(blockID, key,
           xceiverClientManager, pipeline, requestId, chunkSize,
           xceiverClientManager, pipeline, requestId, chunkSize,
           length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout,
           length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout,
-          bufferList, checksum, token);
+          bufferList, checksumType, bytesPerChecksum, token);
     }
     }
   }
   }
 
 
@@ -294,10 +305,6 @@ public final class BlockOutputStreamEntry extends OutputStream {
     return pipeline;
     return pipeline;
   }
   }
 
 
-  public Checksum getChecksum() {
-    return checksum;
-  }
-
   public String getRequestId() {
   public String getRequestId() {
     return requestId;
     return requestId;
   }
   }

+ 26 - 11
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java

@@ -21,10 +21,12 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ChecksumType;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.*;
 import org.apache.hadoop.ozone.om.helpers.*;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -79,7 +81,8 @@ public class KeyOutputStream extends OutputStream {
   private final long streamBufferMaxSize;
   private final long streamBufferMaxSize;
   private final long watchTimeout;
   private final long watchTimeout;
   private final long blockSize;
   private final long blockSize;
-  private final Checksum checksum;
+  private final int bytesPerChecksum;
+  private final ChecksumType checksumType;
   private List<ByteBuffer> bufferList;
   private List<ByteBuffer> bufferList;
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private FileEncryptionInfo feInfo;
   private FileEncryptionInfo feInfo;
@@ -106,7 +109,10 @@ public class KeyOutputStream extends OutputStream {
     bufferList.add(buffer);
     bufferList.add(buffer);
     watchTimeout = 0;
     watchTimeout = 0;
     blockSize = 0;
     blockSize = 0;
-    this.checksum = new Checksum();
+    this.checksumType = ChecksumType.valueOf(
+        OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
+    this.bytesPerChecksum = OzoneConfigKeys
+        .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
@@ -142,7 +148,8 @@ public class KeyOutputStream extends OutputStream {
       OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize,
       OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize,
       String requestId, ReplicationFactor factor, ReplicationType type,
       String requestId, ReplicationFactor factor, ReplicationType type,
       long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
       long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
-      Checksum checksum, String uploadID, int partNumber, boolean isMultipart) {
+      ChecksumType checksumType, int bytesPerChecksum,
+      String uploadID, int partNumber, boolean isMultipart) {
     this.streamEntries = new ArrayList<>();
     this.streamEntries = new ArrayList<>();
     this.currentStreamIndex = 0;
     this.currentStreamIndex = 0;
     this.omClient = omClient;
     this.omClient = omClient;
@@ -165,7 +172,8 @@ public class KeyOutputStream extends OutputStream {
     this.streamBufferMaxSize = bufferMaxSize;
     this.streamBufferMaxSize = bufferMaxSize;
     this.blockSize = size;
     this.blockSize = size;
     this.watchTimeout = watchTimeout;
     this.watchTimeout = watchTimeout;
-    this.checksum = checksum;
+    this.bytesPerChecksum = bytesPerChecksum;
+    this.checksumType = checksumType;
 
 
     Preconditions.checkState(chunkSize > 0);
     Preconditions.checkState(chunkSize > 0);
     Preconditions.checkState(streamBufferFlushSize > 0);
     Preconditions.checkState(streamBufferFlushSize > 0);
@@ -220,7 +228,8 @@ public class KeyOutputStream extends OutputStream {
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setWatchTimeout(watchTimeout)
             .setWatchTimeout(watchTimeout)
             .setBufferList(bufferList)
             .setBufferList(bufferList)
-            .setChecksum(checksum)
+            .setChecksumType(checksumType)
+            .setBytesPerChecksum(bytesPerChecksum)
             .setToken(subKeyInfo.getToken());
             .setToken(subKeyInfo.getToken());
     streamEntries.add(builder.build());
     streamEntries.add(builder.build());
   }
   }
@@ -573,7 +582,8 @@ public class KeyOutputStream extends OutputStream {
     private long streamBufferMaxSize;
     private long streamBufferMaxSize;
     private long blockSize;
     private long blockSize;
     private long watchTimeout;
     private long watchTimeout;
-    private Checksum checksum;
+    private ChecksumType checksumType;
+    private int bytesPerChecksum;
     private String multipartUploadID;
     private String multipartUploadID;
     private int multipartNumber;
     private int multipartNumber;
     private boolean isMultipartKey;
     private boolean isMultipartKey;
@@ -651,8 +661,13 @@ public class KeyOutputStream extends OutputStream {
       return this;
       return this;
     }
     }
 
 
-    public Builder setChecksum(Checksum checksumObj){
-      this.checksum = checksumObj;
+    public Builder setChecksumType(ChecksumType cType){
+      this.checksumType = cType;
+      return this;
+    }
+
+    public Builder setBytesPerChecksum(int bytes){
+      this.bytesPerChecksum = bytes;
       return this;
       return this;
     }
     }
 
 
@@ -664,8 +679,8 @@ public class KeyOutputStream extends OutputStream {
     public KeyOutputStream build() throws IOException {
     public KeyOutputStream build() throws IOException {
       return new KeyOutputStream(openHandler, xceiverManager, scmClient,
       return new KeyOutputStream(openHandler, xceiverManager, scmClient,
           omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
           omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
-          streamBufferMaxSize, blockSize, watchTimeout, checksum,
-          multipartUploadID, multipartNumber, isMultipartKey);
+          streamBufferMaxSize, blockSize, watchTimeout, checksumType,
+          bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey);
     }
     }
   }
   }
 
 

+ 11 - 9
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -48,7 +48,6 @@ import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
-import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -110,7 +109,8 @@ public class RpcClient implements ClientProtocol {
       ozoneManagerClient;
       ozoneManagerClient;
   private final XceiverClientManager xceiverClientManager;
   private final XceiverClientManager xceiverClientManager;
   private final int chunkSize;
   private final int chunkSize;
-  private final Checksum checksum;
+  private final ChecksumType checksumType;
+  private final int bytesPerChecksum;
   private final UserGroupInformation ugi;
   private final UserGroupInformation ugi;
   private final OzoneAcl.OzoneACLRights userRights;
   private final OzoneAcl.OzoneACLRights userRights;
   private final OzoneAcl.OzoneACLRights groupRights;
   private final OzoneAcl.OzoneACLRights groupRights;
@@ -189,22 +189,22 @@ public class RpcClient implements ClientProtocol {
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT,
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT,
         StorageUnit.BYTES);
         StorageUnit.BYTES);
-    int checksumSize;
     if(configuredChecksumSize <
     if(configuredChecksumSize <
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
       LOG.warn("The checksum size ({}) is not allowed to be less than the " +
       LOG.warn("The checksum size ({}) is not allowed to be less than the " +
               "minimum size ({}), resetting to the minimum size.",
               "minimum size ({}), resetting to the minimum size.",
           configuredChecksumSize,
           configuredChecksumSize,
           OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
           OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
-      checksumSize = OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
+      bytesPerChecksum =
+          OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
     } else {
     } else {
-      checksumSize = configuredChecksumSize;
+      bytesPerChecksum = configuredChecksumSize;
     }
     }
+
     String checksumTypeStr = conf.get(
     String checksumTypeStr = conf.get(
         OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
         OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
         OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
         OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
-    ChecksumType checksumType = ChecksumType.valueOf(checksumTypeStr);
-    this.checksum = new Checksum(checksumType, checksumSize);
+    checksumType = ChecksumType.valueOf(checksumTypeStr);
   }
   }
 
 
   private InetSocketAddress getScmAddressForClient() throws IOException {
   private InetSocketAddress getScmAddressForClient() throws IOException {
@@ -602,7 +602,8 @@ public class RpcClient implements ClientProtocol {
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setWatchTimeout(watchTimeout)
             .setWatchTimeout(watchTimeout)
             .setBlockSize(blockSize)
             .setBlockSize(blockSize)
-            .setChecksum(checksum)
+            .setChecksumType(checksumType)
+            .setBytesPerChecksum(bytesPerChecksum)
             .build();
             .build();
     groupOutputStream.addPreallocateBlocks(
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),
         openKey.getKeyInfo().getLatestVersionLocations(),
@@ -863,7 +864,8 @@ public class RpcClient implements ClientProtocol {
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setWatchTimeout(watchTimeout)
             .setWatchTimeout(watchTimeout)
             .setBlockSize(blockSize)
             .setBlockSize(blockSize)
-            .setChecksum(checksum)
+            .setBytesPerChecksum(bytesPerChecksum)
+            .setChecksumType(checksumType)
             .setMultipartNumber(partNumber)
             .setMultipartNumber(partNumber)
             .setMultipartUploadID(uploadID)
             .setMultipartUploadID(uploadID)
             .setIsMultipartKey(true)
             .setIsMultipartKey(true)

+ 63 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java

@@ -27,6 +27,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
@@ -681,6 +684,66 @@ public abstract class TestOzoneRpcClientAbstract {
     }
     }
   }
   }
 
 
+
+  @Test
+  public void testPutKeyRatisThreeNodesParallel() throws IOException,
+      InterruptedException {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    long currentTime = Time.now();
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    CountDownLatch latch = new CountDownLatch(2);
+    AtomicInteger failCount = new AtomicInteger(0);
+
+    Runnable r = () -> {
+      try {
+        for (int i = 0; i < 5; i++) {
+          String keyName = UUID.randomUUID().toString();
+          String data = generateData(5 * 1024 * 1024,
+              (byte) RandomUtils.nextLong()).toString();
+          OzoneOutputStream out = bucket.createKey(keyName,
+              data.getBytes().length, ReplicationType.RATIS,
+              ReplicationFactor.THREE, new HashMap<>());
+          out.write(data.getBytes());
+          out.close();
+          OzoneKey key = bucket.getKey(keyName);
+          Assert.assertEquals(keyName, key.getName());
+          OzoneInputStream is = bucket.readKey(keyName);
+          byte[] fileContent = new byte[data.getBytes().length];
+          is.read(fileContent);
+          is.close();
+          Assert.assertTrue(verifyRatisReplication(volumeName, bucketName,
+              keyName, ReplicationType.RATIS,
+              ReplicationFactor.THREE));
+          Assert.assertEquals(data, new String(fileContent));
+          Assert.assertTrue(key.getCreationTime() >= currentTime);
+          Assert.assertTrue(key.getModificationTime() >= currentTime);
+        }
+        latch.countDown();
+      } catch (IOException ex) {
+        latch.countDown();
+        failCount.incrementAndGet();
+      }
+    };
+
+    Thread thread1 = new Thread(r);
+    Thread thread2 = new Thread(r);
+
+    thread1.start();
+    thread2.start();
+
+    latch.await(600, TimeUnit.SECONDS);
+
+    if (failCount.get() > 0) {
+      fail("testPutKeyRatisThreeNodesParallel failed");
+    }
+
+  }
+
   private void readKey(OzoneBucket bucket, String keyName, String data)
   private void readKey(OzoneBucket bucket, String keyName, String data)
       throws IOException {
       throws IOException {
     OzoneKey key = bucket.getKey(keyName);
     OzoneKey key = bucket.getKey(keyName);

+ 12 - 10
hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java

@@ -24,11 +24,11 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ChecksumType;
 import org.apache.hadoop.ozone.client.io.KeyInputStream;
 import org.apache.hadoop.ozone.client.io.KeyInputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
-import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -86,7 +86,8 @@ public final class DistributedStorageHandler implements StorageHandler {
   private final long streamBufferMaxSize;
   private final long streamBufferMaxSize;
   private final long watchTimeout;
   private final long watchTimeout;
   private final long blockSize;
   private final long blockSize;
-  private final Checksum checksum;
+  private final ChecksumType checksumType;
+  private final int bytesPerChecksum;
 
 
   /**
   /**
    * Creates a new DistributedStorageHandler.
    * Creates a new DistributedStorageHandler.
@@ -136,23 +137,23 @@ public final class DistributedStorageHandler implements StorageHandler {
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT,
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT,
         StorageUnit.BYTES);
         StorageUnit.BYTES);
-    int checksumSize;
+
     if(configuredChecksumSize <
     if(configuredChecksumSize <
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
       LOG.warn("The checksum size ({}) is not allowed to be less than the " +
       LOG.warn("The checksum size ({}) is not allowed to be less than the " +
               "minimum size ({}), resetting to the minimum size.",
               "minimum size ({}), resetting to the minimum size.",
           configuredChecksumSize,
           configuredChecksumSize,
           OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
           OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
-      checksumSize = OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
+      bytesPerChecksum =
+          OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
     } else {
     } else {
-      checksumSize = configuredChecksumSize;
+      bytesPerChecksum = configuredChecksumSize;
     }
     }
     String checksumTypeStr = conf.get(
     String checksumTypeStr = conf.get(
         OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
         OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
         OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
         OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
-    ContainerProtos.ChecksumType checksumType = ContainerProtos.ChecksumType
-        .valueOf(checksumTypeStr);
-    this.checksum = new Checksum(checksumType, checksumSize);
+    this.checksumType = ChecksumType.valueOf(checksumTypeStr);
+
   }
   }
 
 
   @Override
   @Override
@@ -451,7 +452,8 @@ public final class DistributedStorageHandler implements StorageHandler {
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setStreamBufferMaxSize(streamBufferMaxSize)
             .setBlockSize(blockSize)
             .setBlockSize(blockSize)
             .setWatchTimeout(watchTimeout)
             .setWatchTimeout(watchTimeout)
-            .setChecksum(checksum)
+            .setChecksumType(checksumType)
+            .setBytesPerChecksum(bytesPerChecksum)
             .build();
             .build();
     groupOutputStream.addPreallocateBlocks(
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),
         openKey.getKeyInfo().getLatestVersionLocations(),