Explorar o código

HDFS-11519. Ozone: Implement XceiverServerSpi and XceiverClientSpi using Ratis. Contributed by Tsz Wo Nicholas Sze.

Anu Engineer %!s(int64=8) %!d(string=hai) anos
pai
achega
558b478ff2
Modificáronse 20 ficheiros con 784 adicións e 60 borrados
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml
  2. 25 0
      hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
  3. 9 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
  4. 7 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
  5. 111 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java
  6. 10 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java
  7. 38 0
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java
  8. 5 0
      hadoop-hdfs-project/hadoop-hdfs/pom.xml
  9. 16 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
  10. 8 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
  11. 107 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
  12. 119 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
  13. 8 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
  14. 21 17
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
  15. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
  16. 28 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
  17. 61 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
  18. 76 13
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
  19. 88 20
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
  20. 43 1
      hadoop-project/pom.xml

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs-client/dev-support/findbugsExcludeFile.xml

@@ -64,4 +64,7 @@
   <Match>
     <Package name="org.apache.hadoop.ozone.protocol.proto" />
   </Match>
+  <Match>
+    <Package name="org.apache.hadoop.hdfs.ozone.protocol.proto" />
+  </Match>
 </FindBugsFilter>

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs-client/pom.xml

@@ -118,6 +118,31 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.ratis</groupId>
+      <artifactId>ratis-proto-shaded</artifactId>
+    </dependency>
+    <dependency>
+      <artifactId>ratis-common</artifactId>
+      <groupId>org.apache.ratis</groupId>
+    </dependency>
+    <dependency>
+      <artifactId>ratis-client</artifactId>
+      <groupId>org.apache.ratis</groupId>
+    </dependency>
+    <dependency>
+      <artifactId>ratis-server</artifactId>
+      <groupId>org.apache.ratis</groupId>
+    </dependency>
+    <dependency>
+      <artifactId>ratis-netty</artifactId>
+      <groupId>org.apache.ratis</groupId>
+    </dependency>
+    <dependency>
+      <artifactId>ratis-grpc</artifactId>
+      <groupId>org.apache.ratis</groupId>
+    </dependency>
   </dependencies>
 
   <build>

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java

@@ -35,6 +35,15 @@ public final class ScmConfigKeys {
   public static final int SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT =
       10000;
 
+  public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
+      = "dfs.container.ratis.enabled";
+  public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
+      = false;
+  public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY
+      = "dfs.container.ratis.rpc.type";
+  public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT
+      = "GRPC";
+
   // TODO : this is copied from OzoneConsts, may need to move to a better place
   public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB
 

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java

@@ -52,6 +52,7 @@ public class XceiverClientManager {
   private final Configuration conf;
   private Cache<String, XceiverClientWithAccessInfo> openClient;
   private final long staleThresholdMs;
+  private final boolean useRatis;
 
   /**
    * Creates a new XceiverClientManager.
@@ -63,6 +64,9 @@ public class XceiverClientManager {
     this.staleThresholdMs = conf.getTimeDuration(
         SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY,
         SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT, TimeUnit.MILLISECONDS);
+    this.useRatis = conf.getBoolean(
+        ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
+        ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
     this.conf = conf;
     this.openClient = CacheBuilder.newBuilder()
         .expireAfterAccess(this.staleThresholdMs, TimeUnit.MILLISECONDS)
@@ -109,7 +113,9 @@ public class XceiverClientManager {
       return info.getXceiverClient();
     } else {
       // connection not found, create new, add reference and return
-      XceiverClientSpi xceiverClient = new XceiverClient(pipeline, conf);
+      final XceiverClientSpi xceiverClient = useRatis?
+          XceiverClientRatis.newXceiverClientRatis(pipeline, conf)
+          : new XceiverClient(pipeline, conf);
       try {
         xceiverClient.connect();
       } catch (Exception e) {

+ 111 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java

@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.scm;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.ratis.client.ClientFactory;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * An abstract implementation of {@link XceiverClientSpi} using Ratis.
+ * The underlying RPC mechanism can be chosen via the constructor.
+ */
+public final  class XceiverClientRatis implements XceiverClientSpi {
+  static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
+
+  public static XceiverClientRatis newXceiverClientRatis(
+      Pipeline pipeline, Configuration ozoneConf) {
+    final String rpcType = ozoneConf.get(
+        ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
+        ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    return new XceiverClientRatis(pipeline,
+        SupportedRpcType.valueOfIgnoreCase(rpcType));
+  }
+
+  private final Pipeline pipeline;
+  private final RaftClient client;
+
+  /** Constructs a client. */
+  XceiverClientRatis(Pipeline pipeline, RpcType rpcType) {
+    this.pipeline = pipeline;
+    final List<RaftPeer> peers = pipeline.getMachines().stream()
+        .map(dn -> dn.getXferAddr())
+        .map(addr -> new RaftPeer(new RaftPeerId(addr), addr))
+        .collect(Collectors.toList());
+
+    final RaftProperties properties = new RaftProperties();
+    final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(
+        properties, null));
+
+    client = RaftClient.newBuilder()
+        .setClientRpc(factory.newRaftClientRpc())
+        .setServers(peers)
+        .setLeaderId(new RaftPeerId(pipeline.getLeader().getXferAddr()))
+        .setProperties(properties)
+        .build();
+  }
+
+  @Override
+  public Pipeline getPipeline() {
+    return pipeline;
+  }
+
+  @Override
+  public void connect() throws Exception {
+    // do nothing.
+  }
+
+  @Override
+  public void close() {
+    try {
+      client.close();
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public ContainerCommandResponseProto sendCommand(
+      ContainerCommandRequestProto request) throws IOException {
+    LOG.debug("sendCommand {}", request);
+    final RaftClientReply reply = client.send(
+        () -> ShadedProtoUtil.asShadedByteString(request.toByteArray()));
+    LOG.debug("reply {}", reply);
+    Preconditions.checkState(reply.isSuccess());
+    return ContainerCommandResponseProto.parseFrom(
+        ShadedProtoUtil.asByteString(reply.getMessage().getContent()));
+  }
+}

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java

@@ -155,4 +155,14 @@ public class Pipeline {
       return null;
     }
   }
+
+  @Override
+  public String toString() {
+    final StringBuilder b = new StringBuilder(getClass().getSimpleName())
+        .append("[");
+    datanodes.keySet().stream()
+        .forEach(id -> b.append(id.endsWith(leaderID)? "*" + id : id));
+    b.append("] container:").append(containerName);
+    return b.toString();
+  }
 }

+ 38 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java

@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.shaded.com.google.protobuf;
+
+/** Utilities for the shaded protobuf in Ratis. */
+public interface ShadedProtoUtil {
+  /**
+   * @param bytes
+   * @return the wrapped shaded {@link ByteString} (no coping).
+   */
+  static ByteString asShadedByteString(byte[] bytes) {
+    return ByteString.wrap(bytes);
+  }
+
+  /**
+   * @param shaded
+   * @return a {@link com.google.protobuf.ByteString} (require coping).
+   */
+  static com.google.protobuf.ByteString asByteString(ByteString shaded) {
+    return com.google.protobuf.ByteString.copyFrom(
+        shaded.asReadOnlyByteBuffer());
+  }
+}

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/pom.xml

@@ -214,6 +214,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <version>2.5.3</version>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.jctools</groupId>
+      <artifactId>jctools-core</artifactId>
+      <optional>true</optional>
+    </dependency>
   </dependencies>
 
   <build>

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.scm.ScmConfigKeys;
 
 /**
  * This class contains constants for configuration keys used in Ozone.
@@ -67,6 +68,21 @@ public final class OzoneConfigKeys {
       "ozone.container.task.wait.seconds";
   public static final long OZONE_CONTAINER_TASK_WAIT_DEFAULT = 5;
 
+  public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
+  public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
+  public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY;
+  public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT;
+  public static final String DFS_CONTAINER_RATIS_CONF =
+      "dfs.container.ratis.conf";
+  public static final String DFS_CONTAINER_RATIS_DATANODE_ADDRESS =
+      "dfs.container.ratis.datanode.address";
+  public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =
+      "dfs.container.ratis.datanode.storage.dir";
+
   /**
    * There is no need to instantiate this class.
    */

+ 8 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java

@@ -598,10 +598,15 @@ public class ContainerManagerImpl implements ContainerManager {
    */
   @Override
   public boolean isOpen(String containerName) throws StorageContainerException {
-    ContainerData cData = containerMap.get(containerName).getContainer();
+    final ContainerStatus status = containerMap.get(containerName);
+    if (status == null) {
+      throw new StorageContainerException(
+          "Container status not found: " + containerName, CONTAINER_NOT_FOUND);
+    }
+    final ContainerData cData = status.getContainer();
     if (cData == null) {
-      throw new StorageContainerException("Container not found",
-          CONTAINER_NOT_FOUND);
+      throw new StorageContainerException(
+          "Container not found: " + containerName, CONTAINER_NOT_FOUND);
     }
     return cData.isOpen();
   }

+ 107 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.shaded.com.google.protobuf.ShadedProtoUtil;
+import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+import org.apache.ratis.statemachine.BaseStateMachine;
+import org.apache.ratis.statemachine.SimpleStateMachineStorage;
+import org.apache.ratis.statemachine.StateMachineStorage;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/** A {@link org.apache.ratis.statemachine.StateMachine} for containers.  */
+public class ContainerStateMachine extends BaseStateMachine {
+  static final Logger LOG = LoggerFactory.getLogger(
+      ContainerStateMachine.class);
+  private final SimpleStateMachineStorage storage
+      = new SimpleStateMachineStorage();
+  private final ContainerDispatcher dispatcher;
+
+  ContainerStateMachine(ContainerDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public StateMachineStorage getStateMachineStorage() {
+    return storage;
+  }
+
+  @Override
+  public void initialize(
+      RaftPeerId id, RaftProperties properties, RaftStorage raftStorage)
+      throws IOException {
+    super.initialize(id, properties, raftStorage);
+    storage.init(raftStorage);
+//  TODO handle snapshots
+  }
+
+  @Override
+  public CompletableFuture<RaftClientReply> query(RaftClientRequest request) {
+    return dispatch(ShadedProtoUtil.asByteString(
+        request.getMessage().getContent()),
+        response -> new RaftClientReply(request,
+            () -> ShadedProtoUtil.asShadedByteString(response.toByteArray())));
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    final SMLogEntryProto logEntry = trx.getSMLogEntry().get();
+    return dispatch(ShadedProtoUtil.asByteString(logEntry.getData()),
+        response ->
+            () -> ShadedProtoUtil.asShadedByteString(response.toByteArray())
+    );
+  }
+
+  private <T> CompletableFuture<T> dispatch(
+      ByteString requestBytes, Function<ContainerCommandResponseProto, T> f) {
+    final ContainerCommandResponseProto response;
+    try {
+      final ContainerCommandRequestProto request
+          = ContainerCommandRequestProto.parseFrom(requestBytes);
+      LOG.trace("dispatch {}", request);
+      response = dispatcher.dispatch(request);
+      LOG.trace("response {}", response);
+    } catch (IOException e) {
+      return completeExceptionally(e);
+    }
+    return CompletableFuture.completedFuture(f.apply(response));
+  }
+
+  static <T> CompletableFuture<T> completeExceptionally(Exception e) {
+    final CompletableFuture<T> future = new CompletableFuture<>();
+    future.completeExceptionally(e);
+    return future;
+  }
+}

+ 119 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java

@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Creates a ratis server endpoint that acts as the communication layer for
+ * Ozone containers.
+ */
+public final class XceiverServerRatis implements XceiverServerSpi {
+  static RaftProperties newRaftProperties(
+      RpcType rpc, int port, String storageDir) {
+    final RaftProperties properties = new RaftProperties();
+    RaftServerConfigKeys.setStorageDir(properties, storageDir);
+    RaftConfigKeys.Rpc.setType(properties, rpc);
+    if (rpc == SupportedRpcType.GRPC) {
+      GrpcConfigKeys.Server.setPort(properties, port);
+    } else if (rpc == SupportedRpcType.NETTY) {
+      NettyConfigKeys.Server.setPort(properties, port);
+    }
+    return properties;
+  }
+
+  public static XceiverServerRatis newXceiverServerRatis(
+      Configuration ozoneConf, ContainerDispatcher dispatcher)
+      throws IOException {
+    final String id = ozoneConf.get(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS);
+    final Collection<String> servers = ozoneConf.getStringCollection(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF);
+    final String storageDir = ozoneConf.get(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
+    final String rpcType = ozoneConf.get(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
+    return new XceiverServerRatis(id, servers, storageDir, dispatcher, rpc);
+  }
+
+  private final int port;
+  private final RaftServer server;
+
+  private XceiverServerRatis(
+      String id, Collection<String> servers, String storageDir,
+      ContainerDispatcher dispatcher, RpcType rpcType) throws IOException {
+    Preconditions.checkArgument(servers.contains(id),
+        "%s is not one of %s specified in %s",
+        id, servers, OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF);
+
+    final List<RaftPeer> peers = servers.stream()
+        .map(addr -> new RaftPeer(new RaftPeerId(addr), addr))
+        .collect(Collectors.toList());
+
+    this.port = NetUtils.createSocketAddr(id).getPort();
+
+    this.server = RaftServer.newBuilder()
+        .setServerId(new RaftPeerId(id))
+        .setPeers(peers)
+        .setProperties(newRaftProperties(rpcType, port, storageDir))
+        .setStateMachine(new ContainerStateMachine(dispatcher))
+        .build();
+  }
+
+  @Override
+  public void start() throws IOException {
+    server.start();
+  }
+
+  @Override
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public int getIPCPort() {
+    return port;
+  }
+}

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
 import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@@ -86,7 +87,13 @@ public class OzoneContainer {
     manager.setKeyManager(this.keyManager);
 
     this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
-    server = new XceiverServer(this.ozoneConfig, this.dispatcher);
+
+    final boolean useRatis = ozoneConfig.getBoolean(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
+    server = useRatis?
+        XceiverServerRatis.newXceiverServerRatis(ozoneConfig, dispatcher)
+        : new XceiverServer(this.ozoneConfig, this.dispatcher);
   }
 
   /**

+ 21 - 17
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -1567,7 +1567,7 @@ public class MiniDFSCluster implements AutoCloseable {
         dnConf.addResource(dnConfOverlays[i]);
       }
       // Set up datanode address
-      setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
+      setupDatanodeAddress(i, dnConf, setupHostsFile, checkDataNodeAddrConfig);
       if (manageDfsDirs) {
         String dirs = makeDataNodeDirs(i, storageTypes == null ?
           null : storageTypes[i - curDatanodesNum]);
@@ -2831,16 +2831,19 @@ public class MiniDFSCluster implements AutoCloseable {
 
   /**
    * Get a storage directory for a datanode.
+   * For examples,
    * <ol>
-   * <li><base directory>/data/data<2*dnIndex + 1></li>
-   * <li><base directory>/data/data<2*dnIndex + 2></li>
+   * <li><base directory>/data/dn0_data0</li>
+   * <li><base directory>/data/dn0_data1</li>
+   * <li><base directory>/data/dn1_data0</li>
+   * <li><base directory>/data/dn1_data1</li>
    * </ol>
    *
    * @param dnIndex datanode index (starts from 0)
    * @param dirIndex directory index.
    * @return Storage directory
    */
-  public File getStorageDir(int dnIndex, int dirIndex) {
+  public static File getStorageDir(int dnIndex, int dirIndex) {
     return new File(getBaseDirectory(), getStorageDirPath(dnIndex, dirIndex));
   }
 
@@ -2851,8 +2854,8 @@ public class MiniDFSCluster implements AutoCloseable {
    * @param dirIndex directory index.
    * @return storage directory path
    */
-  private String getStorageDirPath(int dnIndex, int dirIndex) {
-    return "data/data" + (storagesPerDatanode * dnIndex + 1 + dirIndex);
+  private static String getStorageDirPath(int dnIndex, int dirIndex) {
+    return "data/dn" + dnIndex + "_data" + dirIndex;
   }
 
   /**
@@ -3084,35 +3087,36 @@ public class MiniDFSCluster implements AutoCloseable {
     waitActive(nnIndex);
   }
   
-  protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
-                           boolean checkDataNodeAddrConfig) throws IOException {
+  protected void setupDatanodeAddress(
+      int i, Configuration dnConf, boolean setupHostsFile,
+      boolean checkDataNodeAddrConfig) throws IOException {
     if (setupHostsFile) {
-      String hostsFile = conf.get(DFS_HOSTS, "").trim();
+      String hostsFile = dnConf.get(DFS_HOSTS, "").trim();
       if (hostsFile.length() == 0) {
         throw new IOException("Parameter dfs.hosts is not setup in conf");
       }
       // Setup datanode in the include file, if it is defined in the conf
       String address = "127.0.0.1:" + NetUtils.getFreeSocketPort();
       if (checkDataNodeAddrConfig) {
-        conf.setIfUnset(DFS_DATANODE_ADDRESS_KEY, address);
+        dnConf.setIfUnset(DFS_DATANODE_ADDRESS_KEY, address);
       } else {
-        conf.set(DFS_DATANODE_ADDRESS_KEY, address);
+        dnConf.set(DFS_DATANODE_ADDRESS_KEY, address);
       }
       addToFile(hostsFile, address);
       LOG.info("Adding datanode " + address + " to hosts file " + hostsFile);
     } else {
       if (checkDataNodeAddrConfig) {
-        conf.setIfUnset(DFS_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
+        dnConf.setIfUnset(DFS_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
       } else {
-        conf.set(DFS_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
+        dnConf.set(DFS_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
       }
     }
     if (checkDataNodeAddrConfig) {
-      conf.setIfUnset(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
-      conf.setIfUnset(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0");
+      dnConf.setIfUnset(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
+      dnConf.setIfUnset(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0");
     } else {
-      conf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
-      conf.set(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0");
+      dnConf.set(DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
+      dnConf.set(DFS_DATANODE_IPC_ADDRESS_KEY, "127.0.0.1:0");
     }
   }
   

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java

@@ -117,7 +117,7 @@ public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
     for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
       Configuration dnConf = new HdfsConfiguration(conf);
       // Set up datanode address
-      setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
+      setupDatanodeAddress(i, dnConf, setupHostsFile, checkDataNodeAddrConfig);
       if (manageDfsDirs) {
         String dirs = makeDataNodeDirs(i, storageTypes == null ? null : storageTypes[i]);
         dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);

+ 28 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java

@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
@@ -85,6 +86,33 @@ public final class MiniOzoneCluster extends MiniDFSCluster
     tempPath = Paths.get(builder.getPath(), builder.getRunID());
   }
 
+  @Override
+  protected void setupDatanodeAddress(
+      int i, Configuration dnConf, boolean setupHostsFile,
+      boolean checkDnAddrConf) throws IOException {
+    super.setupDatanodeAddress(i, dnConf, setupHostsFile, checkDnAddrConf);
+
+    final boolean useRatis = dnConf.getBoolean(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
+    if (!useRatis) {
+      return;
+    }
+    final String[] ids = dnConf.getStrings(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF);
+    // TODO: use the i-th raft server as the i-th datanode address
+    //       this only work for one Raft cluster
+    setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS,
+        ids[i]);
+    setConf(i, dnConf, OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR,
+        getInstanceStorageDir(i, -1).getCanonicalPath());
+  }
+
+  static void setConf(int i, Configuration conf, String key, String value) {
+    conf.set(key, value);
+    LOG.info("dn{}: set {} = {}", i, key, value);
+  }
+
   @Override
   public void close() {
     shutdown();

+ 61 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java

@@ -21,17 +21,22 @@ package org.apache.hadoop.ozone.container;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
     .ContainerCommandResponseProto;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.ratis.rpc.RpcType;
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.ServerSocket;
@@ -42,11 +47,14 @@ import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Helpers for container tests.
  */
 public final class ContainerTestHelper {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      ContainerTestHelper.class);
   private static Random r = new Random();
 
   /**
@@ -64,19 +72,50 @@ public final class ContainerTestHelper {
    */
   public static Pipeline createSingleNodePipeline(String containerName) throws
       IOException {
+    return createPipeline(containerName, 1);
+  }
+
+  public static DatanodeID createDatanodeID() throws IOException {
     ServerSocket socket = new ServerSocket(0);
     int port = socket.getLocalPort();
     DatanodeID datanodeID = new DatanodeID(socket.getInetAddress()
         .getHostAddress(), socket.getInetAddress().getHostName(),
         UUID.randomUUID().toString(), port, port, port, port);
     datanodeID.setContainerPort(port);
-    Pipeline pipeline = new Pipeline(datanodeID.getDatanodeUuid());
-    pipeline.addMember(datanodeID);
-    pipeline.setContainerName(containerName);
     socket.close();
+    return datanodeID;
+  }
+
+  /**
+   * Create a pipeline with single node replica.
+   *
+   * @return Pipeline with single node in it.
+   * @throws IOException
+   */
+  public static Pipeline createPipeline(String containerName, int numNodes)
+      throws IOException {
+    Preconditions.checkArgument(numNodes >= 1);
+    final DatanodeID leader = createDatanodeID();
+    Pipeline pipeline = new Pipeline(leader.getDatanodeUuid());
+    pipeline.setContainerName(containerName);
+    pipeline.addMember(leader);
+
+    for(int i = 1; i < numNodes; i++) {
+      pipeline.addMember(createDatanodeID());
+    }
     return pipeline;
   }
 
+  public static void initRatisConf(
+      RpcType rpc, Pipeline pipeline, Configuration conf) {
+    conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY, true);
+    conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, rpc.name());
+    conf.setStrings(OzoneConfigKeys.DFS_CONTAINER_RATIS_CONF,
+        pipeline.getMachines().stream()
+            .map(dn -> dn.getXferAddr())
+            .collect(Collectors.joining(",")));
+  }
+
   /**
    * Creates a ChunkInfo for testing.
    *
@@ -133,6 +172,8 @@ public final class ContainerTestHelper {
   public static ContainerCommandRequestProto getWriteChunkRequest(
       Pipeline pipeline, String containerName, String keyName, int datalen)
       throws IOException, NoSuchAlgorithmException {
+    LOG.trace("writeChunk {} (key={}) to pipeline=",
+        datalen, keyName, pipeline);
     ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
         ContainerProtos.WriteChunkRequestProto
             .newBuilder();
@@ -225,6 +266,9 @@ public final class ContainerTestHelper {
   public static ContainerCommandRequestProto getReadChunkRequest(
       ContainerProtos.WriteChunkRequestProto request)
       throws IOException, NoSuchAlgorithmException {
+    LOG.trace("readChunk key={} from pipeline={}",
+        request.getKeyName(), request.getPipeline());
+
     ContainerProtos.ReadChunkRequestProto.Builder readRequest =
         ContainerProtos.ReadChunkRequestProto.newBuilder();
 
@@ -252,6 +296,9 @@ public final class ContainerTestHelper {
       ContainerProtos.WriteChunkRequestProto writeRequest)
       throws
       IOException, NoSuchAlgorithmException {
+    LOG.trace("deleteChunk key={} from pipeline={}",
+        writeRequest.getKeyName(), writeRequest.getPipeline());
+
     ContainerProtos.DeleteChunkRequestProto.Builder deleteRequest =
         ContainerProtos.DeleteChunkRequestProto
             .newBuilder();
@@ -275,6 +322,8 @@ public final class ContainerTestHelper {
    */
   public static ContainerCommandRequestProto getCreateContainerRequest(
       String containerName) throws IOException {
+    LOG.trace("createContainer: {}", containerName);
+
     ContainerProtos.CreateContainerRequestProto.Builder createRequest =
         ContainerProtos.CreateContainerRequestProto
             .newBuilder();
@@ -358,6 +407,9 @@ public final class ContainerTestHelper {
    */
   public static ContainerCommandRequestProto getPutKeyRequest(
       ContainerProtos.WriteChunkRequestProto writeRequest) {
+    LOG.trace("putKey: {} to pipeline={}",
+        writeRequest.getKeyName(), writeRequest.getPipeline());
+
     ContainerProtos.PutKeyRequestProto.Builder putRequest =
         ContainerProtos.PutKeyRequestProto.newBuilder();
 
@@ -384,6 +436,9 @@ public final class ContainerTestHelper {
    */
   public static ContainerCommandRequestProto getKeyRequest(
       ContainerProtos.PutKeyRequestProto putKeyRequest) {
+    LOG.trace("getKey: name={} from pipeline={}",
+        putKeyRequest.getKeyData().getName(), putKeyRequest.getPipeline());
+
     ContainerProtos.GetKeyRequestProto.Builder getRequest =
         ContainerProtos.GetKeyRequestProto.newBuilder();
     ContainerProtos.KeyData.Builder keyData = ContainerProtos.KeyData
@@ -422,6 +477,9 @@ public final class ContainerTestHelper {
    */
   public static ContainerCommandRequestProto getDeleteKeyRequest(
       ContainerProtos.PutKeyRequestProto putKeyRequest) {
+    LOG.trace("deleteKey: name={} from pipeline={}",
+        putKeyRequest.getKeyData().getName(), putKeyRequest.getPipeline());
+
     ContainerProtos.DeleteKeyRequestProto.Builder delRequest =
         ContainerProtos.DeleteKeyRequestProto.newBuilder();
     delRequest.setPipeline(putKeyRequest.getPipeline());

+ 76 - 13
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java

@@ -25,7 +25,11 @@ import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.scm.XceiverClient;
+import org.apache.hadoop.scm.XceiverClientRatis;
+import org.apache.hadoop.scm.XceiverClientSpi;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.rpc.SupportedRpcType;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -88,20 +92,69 @@ public class TestOzoneContainer {
     }
   }
 
+  @Test
+  public void testOzoneContainerViaDataNodeRatisGrpc() throws Exception {
+    runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.GRPC, 1);
+    runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.GRPC, 3);
+  }
+
+  @Test
+  public void testOzoneContainerViaDataNodeRatisNetty() throws Exception {
+    runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.NETTY, 1);
+    runTestOzoneContainerViaDataNodeRatis(SupportedRpcType.NETTY, 3);
+  }
+
+  private static void runTestOzoneContainerViaDataNodeRatis(
+      RpcType rpc, int numNodes) throws Exception {
+    ContainerTestHelper.LOG.info("runTestOzoneContainerViaDataNodeRatis(rpc="
+        + rpc + ", numNodes=" + numNodes);
+
+    final String containerName = OzoneUtils.getRequestID();
+    final Pipeline pipeline = ContainerTestHelper.createPipeline(
+        containerName, numNodes);
+    final OzoneConfiguration conf = initOzoneConfiguration(pipeline);
+    ContainerTestHelper.initRatisConf(rpc, pipeline, conf);
+
+    final MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
+        .setHandlerType("local")
+        .numDataNodes(pipeline.getMachines().size())
+        .build();
+    cluster.waitOzoneReady();
+    final XceiverClientSpi client = XceiverClientRatis.newXceiverClientRatis(
+        pipeline, conf);
+
+    try {
+      runTestOzoneContainerViaDataNode(containerName, pipeline, client);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private static OzoneConfiguration initOzoneConfiguration(Pipeline pipeline) {
+    final OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+        pipeline.getLeader().getContainerPort());
+
+    setOzoneLocalStorageRoot(conf);
+    return conf;
+  }
+
+  private static void setOzoneLocalStorageRoot(OzoneConfiguration conf) {
+    URL p = conf.getClass().getResource("");
+    String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
+    path += conf.getTrimmed(
+        OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
+        OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
+    conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
+  }
+
   @Test
   public void testOzoneContainerViaDataNode() throws Exception {
     MiniOzoneCluster cluster = null;
-    XceiverClient client = null;
     try {
-      String keyName = OzoneUtils.getRequestID();
       String containerName = OzoneUtils.getRequestID();
       OzoneConfiguration conf = new OzoneConfiguration();
-      URL p = conf.getClass().getResource("");
-      String path = p.getPath().concat(
-          TestOzoneContainer.class.getSimpleName());
-      path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
-          OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
-      conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
+      setOzoneLocalStorageRoot(conf);
 
       // Start ozone container Via Datanode create.
 
@@ -115,19 +168,32 @@ public class TestOzoneContainer {
           .setHandlerType("distributed").build();
 
       // This client talks to ozone container via datanode.
-      client = new XceiverClient(pipeline, conf);
+      XceiverClient client = new XceiverClient(pipeline, conf);
+
+      runTestOzoneContainerViaDataNode(containerName, pipeline, client);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  static void runTestOzoneContainerViaDataNode(
+      String containerName, Pipeline pipeline, XceiverClientSpi client)
+      throws Exception {
+    try {
       client.connect();
 
       // Create container
       ContainerProtos.ContainerCommandRequestProto request =
           ContainerTestHelper.getCreateContainerRequest(containerName);
-      pipeline.setContainerName(containerName);
       ContainerProtos.ContainerCommandResponseProto response =
           client.sendCommand(request);
       Assert.assertNotNull(response);
       Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
 
       // Write Chunk
+      final String keyName = OzoneUtils.getRequestID();
       ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
           ContainerTestHelper.getWriteChunkRequest(pipeline, containerName,
               keyName, 1024);
@@ -204,9 +270,6 @@ public class TestOzoneContainer {
       if (client != null) {
         client.close();
       }
-      if (cluster != null) {
-        cluster.shutdown();
-      }
     }
   }
 

+ 88 - 20
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java

@@ -22,29 +22,43 @@ import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
-import org.apache.hadoop.scm.XceiverClient;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerHandler;
-
+import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.scm.XceiverClient;
+import org.apache.hadoop.scm.XceiverClientRatis;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.rpc.RpcType;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiConsumer;
 
+import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
+import static org.apache.ratis.rpc.SupportedRpcType.NETTY;
 import static org.mockito.Mockito.mock;
 
 /**
  * Test Containers.
  */
 public class TestContainerServer {
+  static final String TEST_DIR
+      = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
 
   @Test
   public void testPipeline() throws IOException {
@@ -73,33 +87,87 @@ public class TestContainerServer {
 
   @Test
   public void testClientServer() throws Exception {
-    XceiverServer server = null;
-    XceiverClient client = null;
+    runTestClientServer(1,
+        (pipeline, conf) -> conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+            pipeline.getLeader().getContainerPort()),
+        XceiverClient::new,
+        (dn, conf) -> new XceiverServer(conf, new TestContainerDispatcher()));
+  }
+
+  @FunctionalInterface
+  interface CheckedBiFunction<LEFT, RIGHT, OUT, THROWABLE extends Throwable> {
+    OUT apply(LEFT left, RIGHT right) throws THROWABLE;
+  }
+
+  @Test
+  public void testClientServerRatisNetty() throws Exception {
+    runTestClientServerRatis(NETTY, 1);
+    runTestClientServerRatis(NETTY, 3);
+  }
+
+  @Test
+  public void testClientServerRatisGrpc() throws Exception {
+    runTestClientServerRatis(GRPC, 1);
+    runTestClientServerRatis(GRPC, 3);
+  }
+
+  static XceiverServerRatis newXceiverServerRatis(
+      DatanodeID dn, OzoneConfiguration conf) throws IOException {
+    final String id = dn.getXferAddr();
+    conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_ADDRESS, id);
+    final String dir = TEST_DIR + id.replace(':', '_');
+    conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
+
+    final ContainerDispatcher dispatcher = new TestContainerDispatcher();
+    return XceiverServerRatis.newXceiverServerRatis(conf, dispatcher);
+  }
+
+  static void runTestClientServerRatis(RpcType rpc, int numNodes)
+      throws Exception {
+    runTestClientServer(numNodes,
+        (pipeline, conf) -> ContainerTestHelper.initRatisConf(
+            rpc, pipeline, conf),
+        XceiverClientRatis::newXceiverClientRatis,
+        TestContainerServer::newXceiverServerRatis);
+  }
+
+  static void runTestClientServer(
+      int numDatanodes,
+      BiConsumer<Pipeline, OzoneConfiguration> initConf,
+      CheckedBiFunction<Pipeline, OzoneConfiguration, XceiverClientSpi,
+          IOException> createClient,
+      CheckedBiFunction<DatanodeID, OzoneConfiguration, XceiverServerSpi,
+          IOException> createServer)
+      throws Exception {
+    final List<XceiverServerSpi> servers = new ArrayList<>();
+    XceiverClientSpi client = null;
     String containerName = OzoneUtils.getRequestID();
     try {
-      Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(
-          containerName);
-      OzoneConfiguration conf = new OzoneConfiguration();
-      conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
-          pipeline.getLeader().getContainerPort());
-
-      server = new XceiverServer(conf, new TestContainerDispatcher());
-      client = new XceiverClient(pipeline, conf);
+      final Pipeline pipeline = ContainerTestHelper.createPipeline(
+          containerName, numDatanodes);
+      final OzoneConfiguration conf = new OzoneConfiguration();
+      initConf.accept(pipeline, conf);
+
+      for(DatanodeID dn : pipeline.getMachines()) {
+        final XceiverServerSpi s = createServer.apply(dn, conf);
+        servers.add(s);
+        s.start();
+      }
 
-      server.start();
+      client = createClient.apply(pipeline, conf);
       client.connect();
 
-      ContainerCommandRequestProto request =
+      final ContainerCommandRequestProto request =
           ContainerTestHelper.getCreateContainerRequest(containerName);
+      Assert.assertNotNull(request.getTraceID());
+
       ContainerCommandResponseProto response = client.sendCommand(request);
-      Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
+      Assert.assertEquals(request.getTraceID(), response.getTraceID());
     } finally {
       if (client != null) {
         client.close();
       }
-      if (server != null) {
-        server.stop();
-      }
+      servers.stream().forEach(XceiverServerSpi::stop);
     }
   }
 

+ 43 - 1
hadoop-project/pom.xml

@@ -93,6 +93,9 @@
     <apacheds.version>2.0.0-M21</apacheds.version>
     <ldap-api.version>1.0.0-M33</ldap-api.version>
 
+    <!-- Apache Ratis version -->
+    <ratis.version>0.1-SNAPSHOT</ratis.version>
+
     <!-- define the Java language version used by the compiler -->
     <javac.version>1.8</javac.version>
 
@@ -710,6 +713,43 @@
         </exclusions>
       </dependency>
 
+      <dependency>
+        <groupId>org.jctools</groupId>
+        <artifactId>jctools-core</artifactId>
+        <version>1.2.1</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.ratis</groupId>
+        <artifactId>ratis-proto-shaded</artifactId>
+        <version>${ratis.version}</version>
+      </dependency>
+      <dependency>
+        <artifactId>ratis-common</artifactId>
+        <groupId>org.apache.ratis</groupId>
+        <version>${ratis.version}</version>
+      </dependency>
+      <dependency>
+        <artifactId>ratis-client</artifactId>
+        <groupId>org.apache.ratis</groupId>
+        <version>${ratis.version}</version>
+      </dependency>
+      <dependency>
+        <artifactId>ratis-server</artifactId>
+        <groupId>org.apache.ratis</groupId>
+        <version>${ratis.version}</version>
+      </dependency>
+      <dependency>
+        <artifactId>ratis-netty</artifactId>
+        <groupId>org.apache.ratis</groupId>
+        <version>${ratis.version}</version>
+      </dependency>
+      <dependency>
+        <artifactId>ratis-grpc</artifactId>
+        <groupId>org.apache.ratis</groupId>
+        <version>${ratis.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty</artifactId>
@@ -1505,7 +1545,9 @@
             <configuration>
               <rules>
                 <DependencyConvergence>
-                  <uniqueVersions>true</uniqueVersions>
+                  <!--
+                  <uniqueversions>true</uniqueversions>
+                  -->
                 </DependencyConvergence>
                 <bannedDependencies>
                   <excludes>