Переглянути джерело

HDDS-550. Serialize ApplyTransaction calls per Container in ContainerStateMachine. Contributed by Shashikant Banerjee.

Jitendra Pandey 6 роки тому
батько
коміт
0bc6d0484a
31 змінених файлів з 192 додано та 570 видалено
  1. 3 3
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
  2. 2 2
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
  3. 1 1
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
  4. 2 1
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
  5. 2 2
      hadoop-hdds/common/pom.xml
  6. 5 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
  7. 1 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
  8. 5 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
  9. 2 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
  10. 2 1
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
  11. 7 7
      hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
  12. 0 1
      hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
  13. 8 0
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  14. 1 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
  15. 4 4
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
  16. 118 322
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
  17. 10 5
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
  18. 1 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
  19. 1 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
  20. 1 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
  21. 3 3
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
  22. 2 2
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
  23. 1 1
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
  24. 5 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
  25. 0 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
  26. 2 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
  27. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java
  28. 0 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
  29. 0 201
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
  30. 1 1
      hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
  31. 1 1
      hadoop-project/pom.xml

+ 3 - 3
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java

@@ -31,9 +31,9 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.util.Time;
-import org.apache.ratis.shaded.io.grpc.ManagedChannel;
-import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

+ 2 - 2
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

@@ -21,7 +21,7 @@ package org.apache.hadoop.hdds.scm;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.shaded.com.google.protobuf
+import org.apache.ratis.thirdparty.com.google.protobuf
     .InvalidProtocolBufferException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
@@ -39,7 +39,7 @@ import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.CheckedBiConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

+ 1 - 1
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java

@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;

+ 2 - 1
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java

@@ -18,8 +18,9 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
+
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;

+ 2 - 2
hadoop-hdds/common/pom.xml

@@ -172,10 +172,10 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
             <phase>generate-sources</phase>
             <configuration>
               <tasks>
-                <replace token="com.google.protobuf" value="org.apache.ratis.shaded.com.google.protobuf"
+                <replace token="com.google.protobuf" value="org.apache.ratis.thirdparty.com.google.protobuf"
                   dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto">
                 </replace>
-                <replace token="io.grpc" value="org.apache.ratis.shaded.io.grpc"
+                <replace token="io.grpc" value="org.apache.ratis.thirdparty.io.grpc"
                   dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto">
                 </replace>
               </tasks>

+ 5 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java

@@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.util.TimeDuration;
 
 import java.util.concurrent.TimeUnit;
@@ -62,6 +62,10 @@ public final class ScmConfigKeys {
       = "dfs.container.ratis.replication.level";
   public static final ReplicationLevel
       DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT = ReplicationLevel.MAJORITY;
+  public static final String DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY
+      = "dfs.container.ratis.num.container.op.threads";
+  public static final int DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT
+      = 10;
   public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY =
       "dfs.container.ratis.segment.size";
   public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT =

+ 1 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java

@@ -20,7 +20,7 @@ package org.apache.hadoop.hdds.scm.storage;
 
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .BlockNotCommittedException;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;

+ 5 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 
-import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.util.TimeDuration;
 
 /**
@@ -220,6 +220,10 @@ public final class OzoneConfigKeys {
   public static final ReplicationLevel
       DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT
       = ScmConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT;
+  public static final String DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY;
+  public static final int DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT;
   public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY
       = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY;
   public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT

+ 2 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java

@@ -22,7 +22,8 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.thirdparty.com.google.common.annotations.
+    VisibleForTesting;
 import org.rocksdb.DbPath;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;

+ 2 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java

@@ -23,7 +23,8 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.utils.RocksDBStoreMBean;
-import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.thirdparty.com.google.common.annotations.
+    VisibleForTesting;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 

+ 7 - 7
hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java

@@ -32,8 +32,8 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
@@ -103,7 +103,7 @@ public interface RatisHelper {
   RaftGroupId DUMMY_GROUP_ID =
       RaftGroupId.valueOf(ByteString.copyFromUtf8("AOzoneRatisGroup"));
 
-  RaftGroup EMPTY_GROUP = new RaftGroup(DUMMY_GROUP_ID,
+  RaftGroup EMPTY_GROUP = RaftGroup.valueOf(DUMMY_GROUP_ID,
       Collections.emptyList());
 
   static RaftGroup emptyRaftGroup() {
@@ -112,7 +112,7 @@ public interface RatisHelper {
 
   static RaftGroup newRaftGroup(Collection<RaftPeer> peers) {
     return peers.isEmpty()? emptyRaftGroup()
-        : new RaftGroup(DUMMY_GROUP_ID, peers);
+        : RaftGroup.valueOf(DUMMY_GROUP_ID, peers);
   }
 
   static RaftGroup newRaftGroup(RaftGroupId groupId,
@@ -120,12 +120,12 @@ public interface RatisHelper {
     final List<RaftPeer> newPeers = peers.stream()
         .map(RatisHelper::toRaftPeer)
         .collect(Collectors.toList());
-    return peers.isEmpty() ? new RaftGroup(groupId, Collections.emptyList())
-        : new RaftGroup(groupId, newPeers);
+    return peers.isEmpty() ? RaftGroup.valueOf(groupId, Collections.emptyList())
+        : RaftGroup.valueOf(groupId, newPeers);
   }
 
   static RaftGroup newRaftGroup(Pipeline pipeline) {
-    return new RaftGroup(pipeline.getId().getRaftGroupID(),
+    return RaftGroup.valueOf(pipeline.getId().getRaftGroupID(),
         toRaftPeers(pipeline));
   }
 

+ 0 - 1
hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto

@@ -197,7 +197,6 @@ message ContainerCommandRequestProto {
 
   optional   PutSmallFileRequestProto putSmallFile = 19;
   optional   GetSmallFileRequestProto getSmallFile = 20;
-
   optional   GetCommittedBlockLengthRequestProto getCommittedBlockLength = 21;
 }
 

+ 8 - 0
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -135,6 +135,14 @@
       MAJORTIY, MAJORITY is used as the default replication level.
     </description>
   </property>
+  <property>
+    <name>dfs.container.ratis.num.container.op.executors</name>
+    <value>10</value>
+    <tag>OZONE, RATIS, PERFORMANCE</tag>
+    <description>Number of executors that will be used by Ratis to execute
+      container ops.(10 by default).
+    </description>
+  </property>
   <property>
     <name>dfs.container.ratis.segment.size</name>
     <value>1073741824</value>

+ 1 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java

@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.datanode.proto
     .XceiverClientProtocolServiceGrpc;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

+ 4 - 4
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java

@@ -33,10 +33,10 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 
-import org.apache.ratis.shaded.io.grpc.BindableService;
-import org.apache.ratis.shaded.io.grpc.Server;
-import org.apache.ratis.shaded.io.grpc.ServerBuilder;
-import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
+import org.apache.ratis.thirdparty.io.grpc.BindableService;
+import org.apache.ratis.thirdparty.io.grpc.Server;
+import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

+ 118 - 322
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -26,7 +25,7 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.shaded.com.google.protobuf
+import org.apache.ratis.thirdparty.com.google.protobuf
     .InvalidProtocolBufferException;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage;
@@ -44,10 +43,10 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.SMLogEntryProto;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -57,12 +56,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 /** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
@@ -98,44 +97,43 @@ import java.util.stream.Collectors;
  *
  * 2) Write chunk commit operation is executed after write chunk state machine
  * operation. This will ensure that commit operation is sync'd with the state
- * machine operation.
- *
- * Synchronization between {@link #writeStateMachineData} and
- * {@link #applyTransaction} need to be enforced in the StateMachine
- * implementation. For example, synchronization between writeChunk and
+ * machine operation.For example, synchronization between writeChunk and
  * createContainer in {@link ContainerStateMachine}.
- *
- * PutBlock is synchronized with WriteChunk operations, PutBlock for a block is
- * executed only after all the WriteChunk preceding the PutBlock have finished.
- *
- * CloseContainer is synchronized with WriteChunk and PutBlock operations,
- * CloseContainer for a container is processed after all the preceding write
- * operations for the container have finished.
- * */
+ **/
+
 public class ContainerStateMachine extends BaseStateMachine {
-  static final Logger LOG = LoggerFactory.getLogger(
-      ContainerStateMachine.class);
-  private final SimpleStateMachineStorage storage
-      = new SimpleStateMachineStorage();
+  static final Logger LOG =
+      LoggerFactory.getLogger(ContainerStateMachine.class);
+  private final SimpleStateMachineStorage storage =
+      new SimpleStateMachineStorage();
   private final ContainerDispatcher dispatcher;
   private ThreadPoolExecutor chunkExecutor;
   private final XceiverServerRatis ratisServer;
   private final ConcurrentHashMap<Long, CompletableFuture<Message>>
       writeChunkFutureMap;
-  private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap;
+  private final ConcurrentHashMap<Long, CompletableFuture<Message>>
+      createContainerFutureMap;
+  private ExecutorService[] executors;
+  private final int numExecutors;
   /**
    * CSM metrics.
    */
   private final CSMMetrics metrics;
 
   public ContainerStateMachine(ContainerDispatcher dispatcher,
-      ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer) {
+      ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer,
+      int  numOfExecutors) {
     this.dispatcher = dispatcher;
     this.chunkExecutor = chunkExecutor;
     this.ratisServer = ratisServer;
     this.writeChunkFutureMap = new ConcurrentHashMap<>();
-    this.stateMachineMap = new ConcurrentHashMap<>();
     metrics = CSMMetrics.create();
+    this.createContainerFutureMap = new ConcurrentHashMap<>();
+    this.numExecutors = numOfExecutors;
+    executors = new ExecutorService[numExecutors];
+    for (int i = 0; i < numExecutors; i++) {
+      executors[i] = Executors.newSingleThreadExecutor();
+    }
   }
 
   @Override
@@ -229,6 +227,41 @@ public class ContainerStateMachine extends BaseStateMachine {
     return dispatchCommand(requestProto)::toByteString;
   }
 
+  private ExecutorService getCommandExecutor(
+      ContainerCommandRequestProto requestProto) {
+    int executorId = (int)(requestProto.getContainerID() % numExecutors);
+    return executors[executorId];
+  }
+
+  private CompletableFuture<Message> handleWriteChunk(
+      ContainerCommandRequestProto requestProto, long entryIndex) {
+    final WriteChunkRequestProto write = requestProto.getWriteChunk();
+    long containerID = write.getBlockID().getContainerID();
+    CompletableFuture<Message> future =
+        createContainerFutureMap.get(containerID);
+    CompletableFuture<Message> writeChunkFuture;
+    if (future != null) {
+      writeChunkFuture = future.thenApplyAsync(
+          v -> runCommand(requestProto), chunkExecutor);
+    } else {
+      writeChunkFuture = CompletableFuture.supplyAsync(
+          () -> runCommand(requestProto), chunkExecutor);
+    }
+    writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+    // Remove the future once it finishes execution from the
+    // writeChunkFutureMap.
+    writeChunkFuture.thenApply(r -> writeChunkFutureMap.remove(entryIndex));
+    return writeChunkFuture;
+  }
+
+  private CompletableFuture<Message> handleCreateContainer(
+      ContainerCommandRequestProto requestProto) {
+    long containerID = requestProto.getContainerID();
+    createContainerFutureMap.
+        computeIfAbsent(containerID, k -> new CompletableFuture<>());
+    return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
+  }
+
   /*
    * writeStateMachineData calls are not synchronized with each other
    * and also with applyTransaction.
@@ -240,17 +273,15 @@ public class ContainerStateMachine extends BaseStateMachine {
       final ContainerCommandRequestProto requestProto =
           getRequestProto(entry.getSmLogEntry().getStateMachineData());
       Type cmdType = requestProto.getCmdType();
-      long containerId = requestProto.getContainerID();
-      stateMachineMap
-          .computeIfAbsent(containerId, k -> new StateMachineHelper());
-      CompletableFuture<Message> stateMachineFuture =
-          stateMachineMap.get(containerId)
-              .handleStateMachineData(requestProto, entry.getIndex());
-      if (stateMachineFuture == null) {
-        throw new IllegalStateException(
-            "Cmd Type:" + cmdType + " should not have state machine data");
+      switch (cmdType) {
+      case CreateContainer:
+        return handleCreateContainer(requestProto);
+      case WriteChunk:
+        return handleWriteChunk(requestProto, entry.getIndex());
+      default:
+        throw new IllegalStateException("Cmd Type:" + cmdType
+            + " should not have state machine data");
       }
-      return stateMachineFuture;
     } catch (IOException e) {
       metrics.incNumWriteStateMachineFails();
       return completeExceptionally(e);
@@ -270,14 +301,14 @@ public class ContainerStateMachine extends BaseStateMachine {
     }
   }
 
-  private LogEntryProto readStateMachineData(LogEntryProto entry,
+  private ByteString readStateMachineData(LogEntryProto entry,
       ContainerCommandRequestProto requestProto) {
     WriteChunkRequestProto writeChunkRequestProto =
         requestProto.getWriteChunk();
     // Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is
     // written through writeStateMachineData.
-    Preconditions.checkArgument(writeChunkRequestProto.getStage()
-        == Stage.COMMIT_DATA);
+    Preconditions
+        .checkArgument(writeChunkRequestProto.getStage() == Stage.COMMIT_DATA);
 
     // prepare the chunk to be read
     ReadChunkRequestProto.Builder readChunkRequestProto =
@@ -286,8 +317,7 @@ public class ContainerStateMachine extends BaseStateMachine {
             .setChunkData(writeChunkRequestProto.getChunkData());
     ContainerCommandRequestProto dataContainerCommandProto =
         ContainerCommandRequestProto.newBuilder(requestProto)
-            .setCmdType(Type.ReadChunk)
-            .setReadChunk(readChunkRequestProto)
+            .setCmdType(Type.ReadChunk).setReadChunk(readChunkRequestProto)
             .build();
 
     // read the chunk
@@ -302,25 +332,13 @@ public class ContainerStateMachine extends BaseStateMachine {
     final WriteChunkRequestProto.Builder dataWriteChunkProto =
         WriteChunkRequestProto.newBuilder(writeChunkRequestProto)
             // adding the state machine data
-            .setData(responseProto.getData())
-            .setStage(Stage.WRITE_DATA);
+            .setData(responseProto.getData()).setStage(Stage.WRITE_DATA);
 
     ContainerCommandRequestProto.Builder newStateMachineProto =
         ContainerCommandRequestProto.newBuilder(requestProto)
             .setWriteChunk(dataWriteChunkProto);
 
-    return recreateLogEntryProto(entry,
-        newStateMachineProto.build().toByteString());
-  }
-
-  private LogEntryProto recreateLogEntryProto(LogEntryProto entry,
-      ByteString stateMachineData) {
-    // recreate the log entry
-    final SMLogEntryProto log =
-        SMLogEntryProto.newBuilder(entry.getSmLogEntry())
-            .setStateMachineData(stateMachineData)
-            .build();
-    return LogEntryProto.newBuilder(entry).setSmLogEntry(log).build();
+    return newStateMachineProto.build().toByteString();
   }
 
   /**
@@ -347,11 +365,11 @@ public class ContainerStateMachine extends BaseStateMachine {
    * evicted.
    */
   @Override
-  public CompletableFuture<LogEntryProto> readStateMachineData(
+  public CompletableFuture<ByteString> readStateMachineData(
       LogEntryProto entry) {
     SMLogEntryProto smLogEntryProto = entry.getSmLogEntry();
     if (!smLogEntryProto.getStateMachineData().isEmpty()) {
-      return CompletableFuture.completedFuture(entry);
+      return CompletableFuture.completedFuture(ByteString.EMPTY);
     }
 
     try {
@@ -365,9 +383,7 @@ public class ContainerStateMachine extends BaseStateMachine {
                 readStateMachineData(entry, requestProto),
             chunkExecutor);
       } else if (requestProto.getCmdType() == Type.CreateContainer) {
-        LogEntryProto log =
-            recreateLogEntryProto(entry, requestProto.toByteString());
-        return CompletableFuture.completedFuture(log);
+        return CompletableFuture.completedFuture(requestProto.toByteString());
       } else {
         throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
             + " cannot have state machine data");
@@ -387,13 +403,44 @@ public class ContainerStateMachine extends BaseStateMachine {
       metrics.incNumApplyTransactionsOps();
       ContainerCommandRequestProto requestProto =
           getRequestProto(trx.getSMLogEntry().getData());
-      Preconditions.checkState(!HddsUtils.isReadOnly(requestProto));
-      stateMachineMap.computeIfAbsent(requestProto.getContainerID(),
-          k -> new StateMachineHelper());
-      long index =
-          trx.getLogEntry() == null ? -1 : trx.getLogEntry().getIndex();
-      return stateMachineMap.get(requestProto.getContainerID())
-          .executeContainerCommand(requestProto, index);
+      Type cmdType = requestProto.getCmdType();
+      CompletableFuture<Message> future;
+      if (cmdType == Type.PutBlock) {
+        BlockData blockData;
+        ContainerProtos.BlockData blockDataProto =
+            requestProto.getPutBlock().getBlockData();
+
+        // set the blockCommitSequenceId
+        try {
+          blockData = BlockData.getFromProtoBuf(blockDataProto);
+        } catch (IOException ioe) {
+          LOG.error("unable to retrieve blockData info for Block {}",
+              blockDataProto.getBlockID());
+          return completeExceptionally(ioe);
+        }
+        blockData.setBlockCommitSequenceId(trx.getLogEntry().getIndex());
+        final ContainerProtos.PutBlockRequestProto putBlockRequestProto =
+            ContainerProtos.PutBlockRequestProto
+                .newBuilder(requestProto.getPutBlock())
+                .setBlockData(blockData.getProtoBufMessage()).build();
+        ContainerCommandRequestProto containerCommandRequestProto =
+            ContainerCommandRequestProto.newBuilder(requestProto)
+                .setPutBlock(putBlockRequestProto).build();
+        future = CompletableFuture
+            .supplyAsync(() -> runCommand(containerCommandRequestProto),
+                getCommandExecutor(requestProto));
+      } else {
+        future = CompletableFuture.supplyAsync(() -> runCommand(requestProto),
+            getCommandExecutor(requestProto));
+      }
+      // Mark the createContainerFuture complete so that writeStateMachineData
+      // for WriteChunk gets unblocked
+      if (cmdType == Type.CreateContainer) {
+        long containerID = requestProto.getContainerID();
+        future.thenApply(
+            r -> createContainerFutureMap.remove(containerID).complete(null));
+      }
+      return future;
     } catch (IOException e) {
       metrics.incNumApplyTransactionsFails();
       return completeExceptionally(e);
@@ -419,259 +466,8 @@ public class ContainerStateMachine extends BaseStateMachine {
 
   @Override
   public void close() throws IOException {
-  }
-
-  /**
-   * Class to manage the future tasks for writeChunks.
-   */
-  static class CommitChunkFutureMap {
-    private final ConcurrentHashMap<Long, CompletableFuture<Message>>
-        block2ChunkMap = new ConcurrentHashMap<>();
-
-    synchronized int removeAndGetSize(long index) {
-      block2ChunkMap.remove(index);
-      return block2ChunkMap.size();
+    for (int i = 0; i < numExecutors; i++){
+      executors[i].shutdown();
     }
-
-    synchronized CompletableFuture<Message> add(long index,
-        CompletableFuture<Message> future) {
-      return block2ChunkMap.put(index, future);
-    }
-
-    synchronized List<CompletableFuture<Message>> getAll() {
-      return new ArrayList<>(block2ChunkMap.values());
-    }
-  }
-
-  /**
-   * This class maintains maps and provide utilities to enforce synchronization
-   * among createContainer, writeChunk, putBlock and closeContainer.
-   */
-  private class StateMachineHelper {
-
-    private CompletableFuture<Message> createContainerFuture;
-
-    // Map for maintaining all writeChunk futures mapped to blockId
-    private final ConcurrentHashMap<Long, CommitChunkFutureMap>
-        block2ChunkMap;
-
-    // Map for putBlock futures
-    private final ConcurrentHashMap<Long, CompletableFuture<Message>>
-        blockCommitMap;
-
-    StateMachineHelper() {
-      createContainerFuture = null;
-      block2ChunkMap = new ConcurrentHashMap<>();
-      blockCommitMap = new ConcurrentHashMap<>();
-    }
-
-    // The following section handles writeStateMachineData transactions
-    // on a container
-
-    // enqueue the create container future during writeStateMachineData
-    // so that the write stateMachine data phase of writeChunk wait on
-    // create container to finish.
-    private CompletableFuture<Message> handleCreateContainer() {
-      createContainerFuture = new CompletableFuture<>();
-      return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
-    }
-
-    // This synchronizes on create container to finish
-    private CompletableFuture<Message> handleWriteChunk(
-        ContainerCommandRequestProto requestProto, long entryIndex) {
-      CompletableFuture<Message> containerOpFuture;
-
-      if (createContainerFuture != null) {
-        containerOpFuture = createContainerFuture
-            .thenApplyAsync(v -> runCommand(requestProto), chunkExecutor);
-      } else {
-        containerOpFuture = CompletableFuture
-            .supplyAsync(() -> runCommand(requestProto), chunkExecutor);
-      }
-      writeChunkFutureMap.put(entryIndex, containerOpFuture);
-      return containerOpFuture;
-    }
-
-    CompletableFuture<Message> handleStateMachineData(
-        final ContainerCommandRequestProto requestProto, long index) {
-      Type cmdType = requestProto.getCmdType();
-      if (cmdType == Type.CreateContainer) {
-        return handleCreateContainer();
-      } else if (cmdType == Type.WriteChunk) {
-        return handleWriteChunk(requestProto, index);
-      } else {
-        return null;
-      }
-    }
-
-    // The following section handles applyTransaction transactions
-    // on a container
-
-    private CompletableFuture<Message> handlePutBlock(
-        ContainerCommandRequestProto requestProto, long index) {
-      List<CompletableFuture<Message>> futureList = new ArrayList<>();
-      BlockData blockData = null;
-      ContainerProtos.BlockData blockDataProto =
-          requestProto.getPutBlock().getBlockData();
-
-      // set the blockCommitSequenceId
-      try {
-        blockData = BlockData.getFromProtoBuf(blockDataProto);
-      } catch (IOException ioe) {
-        LOG.error("unable to retrieve blockData info for Block {}",
-            blockDataProto.getBlockID());
-        return completeExceptionally(ioe);
-      }
-      blockData.setBlockCommitSequenceId(index);
-      final ContainerProtos.PutBlockRequestProto putBlockRequestProto =
-          ContainerProtos.PutBlockRequestProto
-              .newBuilder(requestProto.getPutBlock())
-              .setBlockData(blockData.getProtoBufMessage()).build();
-      ContainerCommandRequestProto containerCommandRequestProto =
-          ContainerCommandRequestProto.newBuilder(requestProto)
-              .setPutBlock(putBlockRequestProto).build();
-      long localId = blockDataProto.getBlockID().getLocalID();
-      // Need not wait for create container future here as it has already
-      // finished.
-      if (block2ChunkMap.get(localId) != null) {
-        futureList.addAll(block2ChunkMap.get(localId).getAll());
-      }
-      CompletableFuture<Message> effectiveFuture =
-          runCommandAfterFutures(futureList, containerCommandRequestProto);
-
-      CompletableFuture<Message> putBlockFuture =
-          effectiveFuture.thenApply(message -> {
-            blockCommitMap.remove(localId);
-            return message;
-          });
-      blockCommitMap.put(localId, putBlockFuture);
-      return putBlockFuture;
-    }
-
-    // Close Container should be executed only if all pending WriteType
-    // container cmds get executed. Transactions which can return a future
-    // are WriteChunk and PutBlock.
-    private CompletableFuture<Message> handleCloseContainer(
-        ContainerCommandRequestProto requestProto) {
-      List<CompletableFuture<Message>> futureList = new ArrayList<>();
-
-      // No need to wait for create container future here as it should have
-      // already finished.
-      block2ChunkMap.values().forEach(b -> futureList.addAll(b.getAll()));
-      futureList.addAll(blockCommitMap.values());
-
-      // There are pending write Chunk/PutBlock type requests
-      // Queue this closeContainer request behind all these requests
-      CompletableFuture<Message> closeContainerFuture =
-          runCommandAfterFutures(futureList, requestProto);
-
-      return closeContainerFuture.thenApply(message -> {
-        stateMachineMap.remove(requestProto.getContainerID());
-        return message;
-      });
-    }
-
-    private CompletableFuture<Message> handleChunkCommit(
-        ContainerCommandRequestProto requestProto, long index) {
-      WriteChunkRequestProto write = requestProto.getWriteChunk();
-      // the data field has already been removed in start Transaction
-      Preconditions.checkArgument(!write.hasData());
-      CompletableFuture<Message> stateMachineFuture =
-          writeChunkFutureMap.remove(index);
-      CompletableFuture<Message> commitChunkFuture = stateMachineFuture
-          .thenComposeAsync(v -> CompletableFuture
-              .completedFuture(runCommand(requestProto)));
-
-      long localId = requestProto.getWriteChunk().getBlockID().getLocalID();
-      // Put the applyTransaction Future again to the Map.
-      // closeContainer should synchronize with this.
-      block2ChunkMap
-          .computeIfAbsent(localId, id -> new CommitChunkFutureMap())
-          .add(index, commitChunkFuture);
-      return commitChunkFuture.thenApply(message -> {
-        block2ChunkMap.computeIfPresent(localId, (containerId, chunks)
-            -> chunks.removeAndGetSize(index) == 0? null: chunks);
-        return message;
-      });
-    }
-
-    private CompletableFuture<Message> runCommandAfterFutures(
-        List<CompletableFuture<Message>> futureList,
-        ContainerCommandRequestProto requestProto) {
-      CompletableFuture<Message> effectiveFuture;
-      if (futureList.isEmpty()) {
-        effectiveFuture = CompletableFuture
-            .supplyAsync(() -> runCommand(requestProto));
-
-      } else {
-        CompletableFuture<Void> allFuture = CompletableFuture.allOf(
-            futureList.toArray(new CompletableFuture[futureList.size()]));
-        effectiveFuture = allFuture
-            .thenApplyAsync(v -> runCommand(requestProto));
-      }
-      return effectiveFuture;
-    }
-
-    CompletableFuture<Message> handleCreateContainer(
-        ContainerCommandRequestProto requestProto) {
-      CompletableFuture<Message> future =
-          CompletableFuture.completedFuture(runCommand(requestProto));
-      future.thenAccept(m -> {
-        createContainerFuture.complete(m);
-        createContainerFuture = null;
-      });
-      return future;
-    }
-
-    CompletableFuture<Message> handleOtherCommands(
-        ContainerCommandRequestProto requestProto) {
-      return CompletableFuture.completedFuture(runCommand(requestProto));
-    }
-
-    CompletableFuture<Message> executeContainerCommand(
-        ContainerCommandRequestProto requestProto, long index) {
-      Type cmdType = requestProto.getCmdType();
-      switch (cmdType) {
-      case WriteChunk:
-        return handleChunkCommit(requestProto, index);
-      case CloseContainer:
-        return handleCloseContainer(requestProto);
-      case PutBlock:
-        return handlePutBlock(requestProto, index);
-      case CreateContainer:
-        return handleCreateContainer(requestProto);
-      default:
-        return handleOtherCommands(requestProto);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  public ConcurrentHashMap<Long, StateMachineHelper> getStateMachineMap() {
-    return stateMachineMap;
-  }
-
-  @VisibleForTesting
-  public CompletableFuture<Message> getCreateContainerFuture(long containerId) {
-    StateMachineHelper helper = stateMachineMap.get(containerId);
-    return helper == null ? null : helper.createContainerFuture;
-  }
-
-  @VisibleForTesting
-  public List<CompletableFuture<Message>> getCommitChunkFutureMap(
-      long containerId) {
-    StateMachineHelper helper = stateMachineMap.get(containerId);
-    if (helper != null) {
-      List<CompletableFuture<Message>> futureList = new ArrayList<>();
-      stateMachineMap.get(containerId).block2ChunkMap.values()
-          .forEach(b -> futureList.addAll(b.getAll()));
-      return futureList;
-    }
-    return null;
-  }
-
-  @VisibleForTesting
-  public Collection<CompletableFuture<Message>> getWriteChunkFutureMap() {
-    return writeChunkFutureMap.values();
   }
 }

+ 10 - 5
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java

@@ -56,9 +56,9 @@ import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.shaded.proto.RaftProtos;
-import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
@@ -97,6 +97,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   private final StateContext context;
   private final ReplicationLevel replicationLevel;
   private long nodeFailureTimeoutMs;
+  private ContainerStateMachine stateMachine;
 
   private XceiverServerRatis(DatanodeDetails dd, int port,
       ContainerDispatcher dispatcher, Configuration conf, StateContext context)
@@ -112,12 +113,15 @@ public final class XceiverServerRatis implements XceiverServerSpi {
             100, TimeUnit.SECONDS,
             new ArrayBlockingQueue<>(1024),
             new ThreadPoolExecutor.CallerRunsPolicy());
+    final int numContainerOpExecutors = conf.getInt(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT);
     this.context = context;
     this.replicationLevel =
         conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY,
             OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT);
-    ContainerStateMachine stateMachine =
-        new ContainerStateMachine(dispatcher, chunkExecutor, this);
+    stateMachine = new ContainerStateMachine(dispatcher, chunkExecutor, this,
+        numContainerOpExecutors);
     this.server = RaftServer.newBuilder()
         .setServerId(RatisHelper.toRaftPeerId(dd))
         .setProperties(serverProperties)
@@ -292,6 +296,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   public void stop() {
     try {
       chunkExecutor.shutdown();
+      stateMachine.close();
       server.close();
     } catch (IOException e) {
       throw new RuntimeException(e);

+ 1 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java

@@ -35,7 +35,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;

+ 1 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java

@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos

+ 1 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java

@@ -27,7 +27,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoo
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.ratis.shaded.com.google.protobuf
+import org.apache.ratis.thirdparty.com.google.protobuf
     .InvalidProtocolBufferException;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;

+ 3 - 3
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java

@@ -36,9 +36,9 @@ import org.apache.hadoop.hdds.protocol.datanode.proto
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 
 import com.google.common.base.Preconditions;
-import org.apache.ratis.shaded.io.grpc.ManagedChannel;
-import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

+ 2 - 2
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java

@@ -30,8 +30,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.datanode.proto
     .IntraDatanodeProtocolServiceGrpc;
 
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

+ 1 - 1
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java

@@ -40,7 +40,7 @@ import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;

+ 5 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java

@@ -712,6 +712,11 @@ public class ChunkGroupOutputStream extends OutputStream {
       if (this.outputStream instanceof ChunkOutputStream) {
         ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
         return out.getBlockCommitSequenceId();
+      } else if (outputStream == null) {
+        // For a pre allocated block for which no write has been initiated,
+        // the OutputStream will be null here.
+        // In such cases, the default blockCommitSequenceId will be 0
+        return 0;
       }
       throw new IOException("Invalid Output Stream for Key: " + key);
     }

+ 0 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java

@@ -341,7 +341,6 @@ public class TestCloseContainerHandlingByClient {
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
     // With the initial size provided, it should have pre allocated 4 blocks
     Assert.assertEquals(2, groupOutputStream.getStreamEntries().size());
-    Assert.assertEquals(2, groupOutputStream.getLocationInfoList().size());
     String dataString = fixedLengthString(keyString, (1 * blockSize));
     byte[] data = dataString.getBytes();
     key.write(data);

+ 2 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java

@@ -25,7 +25,7 @@ import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -439,6 +439,7 @@ public final class ContainerTestHelper {
     List<ContainerProtos.ChunkInfo> newList = new LinkedList<>();
     newList.add(writeRequest.getChunkData());
     blockData.setChunks(newList);
+    blockData.setBlockCommitSequenceId(0);
     putRequest.setBlockData(blockData.getProtoBufMessage());
 
     ContainerCommandRequestProto.Builder request =

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java

@@ -32,7 +32,7 @@ import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;

+ 0 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java

@@ -24,7 +24,6 @@ import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
 import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
-import org.apache.ratis.shaded.io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos

+ 0 - 201
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java

@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.server;
-
-
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerType;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis
-    .ContainerStateMachine;
-import org.apache.ratis.RatisHelper;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.shaded.proto.RaftProtos;
-import org.apache.ratis.statemachine.TransactionContext;
-import org.apache.ratis.util.ProtoUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Random;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This class tests ContainerStateMachine.
- */
-public class TestContainerStateMachine {
-
-  private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
-
-  private static long nextCallId() {
-    return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
-  }
-
-  private ThreadPoolExecutor executor =
-      new ThreadPoolExecutor(4, 4, 100, TimeUnit.SECONDS,
-          new ArrayBlockingQueue<>(1024),
-          new ThreadPoolExecutor.CallerRunsPolicy());
-  private ContainerStateMachine stateMachine =
-      new ContainerStateMachine(new TestContainerDispatcher(), executor, null);
-
-
-  @Test
-  public void testCloseContainerSynchronization() throws Exception {
-    Pipeline pipeline = ContainerTestHelper.createPipeline(3);
-    long containerId = new Random().nextLong();
-
-    //create container request
-    RaftClientRequest createContainer = getRaftClientRequest(
-        ContainerTestHelper.getCreateContainerRequest(containerId, pipeline));
-
-    ContainerCommandRequestProto writeChunkProto = ContainerTestHelper
-        .getWriteChunkRequest(pipeline, new BlockID(containerId, nextCallId()),
-            1024);
-
-    RaftClientRequest writeChunkRequest = getRaftClientRequest(writeChunkProto);
-
-    // add putKey request
-    ContainerCommandRequestProto putKeyProto = ContainerTestHelper
-            .getPutBlockRequest(pipeline, writeChunkProto.getWriteChunk());
-    RaftClientRequest putKeyRequest = getRaftClientRequest(putKeyProto);
-
-    TransactionContext createContainerCtxt =
-        startAndWriteStateMachineData(createContainer);
-    // Start and Write into the StateMachine
-    TransactionContext writeChunkcontext =
-        startAndWriteStateMachineData(writeChunkRequest);
-
-    TransactionContext putKeyContext =
-        stateMachine.startTransaction(putKeyRequest);
-    Assert.assertEquals(1, stateMachine.getStateMachineMap().size());
-    Assert.assertNotNull(stateMachine.getCreateContainerFuture(containerId));
-    Assert.assertEquals(1,
-        stateMachine.getWriteChunkFutureMap().size());
-    Assert.assertTrue(
-        stateMachine.getCommitChunkFutureMap(containerId).isEmpty());
-
-    //Add a closeContainerRequest
-    RaftClientRequest closeRequest = getRaftClientRequest(
-        ContainerTestHelper.getCloseContainer(pipeline, containerId));
-
-    TransactionContext closeCtx = stateMachine.startTransaction(closeRequest);
-
-    // Now apply all the transaction for the CreateContainer Command.
-    // This will unblock writeChunks as well
-
-    stateMachine.applyTransaction(createContainerCtxt);
-    stateMachine.applyTransaction(writeChunkcontext);
-    CompletableFuture<Message> putKeyFuture =
-        stateMachine.applyTransaction(putKeyContext);
-    waitForTransactionCompletion(putKeyFuture);
-    // Make sure the putKey transaction complete
-    Assert.assertTrue(putKeyFuture.isDone());
-
-    // Execute the closeContainer. This should ensure all prior Write Type
-    // container requests finish execution
-
-    CompletableFuture<Message> closeFuture =
-        stateMachine.applyTransaction(closeCtx);
-    waitForTransactionCompletion(closeFuture);
-    // Make sure the closeContainer transaction complete
-    Assert.assertTrue(closeFuture.isDone());
-    Assert.assertNull(stateMachine.getCreateContainerFuture(containerId));
-    Assert.assertNull(stateMachine.getCommitChunkFutureMap(containerId));
-
-  }
-
-  private RaftClientRequest getRaftClientRequest(
-      ContainerCommandRequestProto req) throws IOException {
-    ClientId clientId = ClientId.randomId();
-    return new RaftClientRequest(clientId,
-        RatisHelper.toRaftPeerId(ContainerTestHelper.createDatanodeDetails()),
-        RatisHelper.emptyRaftGroup().getGroupId(), nextCallId(), 0,
-        Message.valueOf(req.toByteString()), RaftClientRequest
-        .writeRequestType(RaftProtos.ReplicationLevel.MAJORITY));
-  }
-
-  private void waitForTransactionCompletion(
-      CompletableFuture<Message> future) throws Exception {
-    ExecutorService executorService = Executors.newSingleThreadExecutor();
-    executorService
-        .invokeAll(Collections.singleton(future::get), 10,
-            TimeUnit.SECONDS); // Timeout of 10 minutes.
-    executorService.shutdown();
-  }
-
-  private TransactionContext startAndWriteStateMachineData(
-      RaftClientRequest request) throws IOException {
-    TransactionContext ctx = stateMachine.startTransaction(request);
-    RaftProtos.LogEntryProto e = ProtoUtils
-        .toLogEntryProto(ctx.getSMLogEntry(), request.getSeqNum(),
-            request.getCallId(), ClientId.randomId(), request.getCallId());
-    ctx.setLogEntry(e);
-    stateMachine.writeStateMachineData(e);
-    return ctx;
-  }
-
-  // ContainerDispatcher for test only purpose.
-  private static class TestContainerDispatcher implements ContainerDispatcher {
-    /**
-     * Dispatches commands to container layer.
-     *
-     * @param msg - Command Request
-     * @return Command Response
-     */
-    @Override
-    public ContainerCommandResponseProto dispatch(
-        ContainerCommandRequestProto msg) {
-      return ContainerTestHelper.getCreateContainerResponse(msg);
-    }
-
-    @Override
-    public void init() {
-    }
-
-    @Override
-    public void shutdown() {
-    }
-
-    @Override
-    public void setScmId(String scmId) {
-    }
-
-    @Override
-    public Handler getHandler(ContainerType containerType) {
-      return null;
-    }
-  }
-}

+ 1 - 1
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java

@@ -23,7 +23,7 @@ import org.apache.hadoop.ozone.container.common.statemachine
     .DatanodeStateMachine.DatanodeStates;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomStringUtils;

+ 1 - 1
hadoop-project/pom.xml

@@ -103,7 +103,7 @@
     <ldap-api.version>1.0.0-M33</ldap-api.version>
 
     <!-- Apache Ratis version -->
-    <ratis.version>0.3.0-eca3531-SNAPSHOT</ratis.version>
+    <ratis.version>0.3.0-9b84d79-SNAPSHOT</ratis.version>
     <jcache.version>1.0-alpha-1</jcache.version>
     <ehcache.version>3.3.1</ehcache.version>
     <hikari.version>2.4.12</hikari.version>