Forráskód Böngészése

HDDS-915. Submit client request to OM Ratis server.
Contributed by Hanisha Koneru.

Anu Engineer 6 éve
szülő
commit
999da98d67

+ 23 - 1
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -1543,12 +1543,34 @@
   </property>
 
   <property>
-    <name>ozone.om.ratis.client.request.timeout</name>
+    <name>ozone.om.ratis.client.request.timeout.duration</name>
     <value>3s</value>
     <tag>OZONE, OM, RATIS, MANAGEMENT</tag>
     <description>The timeout duration for OM Ratis client request.
     </description>
   </property>
+	<property>
+		<name>ozone.om.ratis.client.request.max.retries</name>
+		<value>180</value>
+		<tag>OZONE, OM, RATIS, MANAGEMENT</tag>
+		<description>Number of retries for OM client request.</description>
+	</property>
+	<property>
+		<name>ozone.om.ratis.client.request.retry.interval</name>
+		<value>100ms</value>
+		<tag>OZONE, OM, RATIS, MANAGEMENT</tag>
+		<description>Interval between successive retries for a OM client request.
+		</description>
+	</property>
+
+	<property>
+		<name>ozone.om.leader.election.minimum.timeout.duration</name>
+		<value>1s</value>
+		<tag>OZONE, OM, RATIS, MANAGEMENT</tag>
+		<description>The minimum timeout duration for OM ratis leader election.
+			Default is 1s.
+		</description>
+	</property>
 
   <property>
     <name>ozone.acl.authorizer.class</name>

+ 45 - 1
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Optional;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.net.NetUtils;
 
 import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +46,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_PORT_DEFAULT;
  * communication.
  */
 public final class OmUtils {
-  private static final Logger LOG = LoggerFactory.getLogger(OmUtils.class);
+  public static final Logger LOG = LoggerFactory.getLogger(OmUtils.class);
 
   private OmUtils() {
   }
@@ -133,4 +135,46 @@ public final class OmUtils {
         OMConfigKeys.OZONE_OM_DB_DIRS, HddsConfigKeys.OZONE_METADATA_DIRS);
     return ServerUtils.getOzoneMetaDirPath(conf);
   }
+
+  /**
+   * Checks if the OM request is read only or not.
+   * @param omRequest OMRequest proto
+   * @return True if its readOnly, false otherwise.
+   */
+  public static boolean isReadOnly(
+      OzoneManagerProtocolProtos.OMRequest omRequest) {
+    OzoneManagerProtocolProtos.Type cmdType = omRequest.getCmdType();
+    switch (cmdType) {
+    case CheckVolumeAccess:
+    case InfoVolume:
+    case ListVolume:
+    case InfoBucket:
+    case ListBuckets:
+    case LookupKey:
+    case ListKeys:
+    case InfoS3Bucket:
+    case ListS3Buckets:
+    case ServiceList:
+      return true;
+    case CreateVolume:
+    case SetVolumeProperty:
+    case DeleteVolume:
+    case CreateBucket:
+    case SetBucketProperty:
+    case DeleteBucket:
+    case CreateKey:
+    case RenameKey:
+    case DeleteKey:
+    case CommitKey:
+    case AllocateBlock:
+    case CreateS3Bucket:
+    case DeleteS3Bucket:
+    case InitiateMultiPartUpload:
+    case CommitMultiPartUpload:
+      return false;
+    default:
+      LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType);
+      return false;
+    }
+  }
 }

+ 20 - 3
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java

@@ -153,9 +153,26 @@ public final class OMConfigKeys {
       = TimeDuration.valueOf(1, TimeUnit.SECONDS);
 
   // OM Ratis client configurations
-  public static final String OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_KEY
-      = "ozone.om.ratis.client.request.timeout";
+  public static final String OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY
+      = "ozone.om.ratis.client.request.timeout.duration";
   public static final TimeDuration
-      OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DEFAULT
+      OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
       = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
+  public static final String OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY
+      = "ozone.om.ratis.client.request.max.retries";
+  public static final int OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT
+      = 180;
+  public static final String OZONE_OM_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY
+      = "ozone.om.ratis.client.request.retry.interval";
+  public static final TimeDuration
+      OZONE_OM_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT
+      = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+
+  // OM Ratis Leader Election configurations
+  public static final String
+      OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY =
+      "ozone.om.leader.election.minimum.timeout.duration";
+  public static final TimeDuration
+      OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT =
+      TimeDuration.valueOf(1, TimeUnit.SECONDS);
 }

+ 47 - 16
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -68,9 +68,12 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
@@ -134,7 +137,9 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys
     .OZONE_OM_METRICS_SAVE_INTERVAL;
 import static org.apache.hadoop.ozone.om.OMConfigKeys
     .OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService.newReflectiveBlockingService;
+import static org.apache.hadoop.ozone.protocol.proto
+    .OzoneManagerProtocolProtos.OzoneManagerService
+    .newReflectiveBlockingService;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 /**
@@ -156,7 +161,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private final OzoneConfiguration configuration;
   private RPC.Server omRpcServer;
   private InetSocketAddress omRpcAddress;
+  private String omId;
   private OzoneManagerRatisServer omRatisServer;
+  private OzoneManagerRatisClient omRatisClient;
   private final OMMetadataManager metadataManager;
   private final VolumeManager volumeManager;
   private final BucketManager bucketManager;
@@ -185,6 +192,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     Preconditions.checkNotNull(conf);
     configuration = conf;
     omStorage = new OMStorage(conf);
+    omId = omStorage.getOmId();
     scmBlockClient = getScmBlockClient(configuration);
     scmContainerClient = getScmContainerClient(configuration);
     if (omStorage.getState() != StorageState.INITIALIZED) {
@@ -584,35 +592,42 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
     int handlerCount = configuration.getInt(OZONE_OM_HANDLER_COUNT_KEY,
         OZONE_OM_HANDLER_COUNT_DEFAULT);
-    BlockingService omService = newReflectiveBlockingService(
-        new OzoneManagerProtocolServerSideTranslatorPB(this));
-    omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
-        OzoneManagerProtocolPB.class, omService,
-        handlerCount);
-    omRpcAddress = updateRPCListenAddress(configuration,
-        OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
-    omRpcServer.start();
-
-    LOG.info(buildRpcServerStartMessage("OzoneManager RPC server",
-        omRpcAddress));
 
+    // This is a temporary check. Once fully implemented, all OM state change
+    // should go through Ratis - either standalone (for non-HA) or replicated
+    // (for HA).
     boolean omRatisEnabled = configuration.getBoolean(
         OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
         OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
-    // This is a temporary check. Once fully implemented, all OM state change
-    // should go through Ratis, either standalone (for non-HA) or replicated
-    // (for HA).
     if (omRatisEnabled) {
       omRatisServer = OzoneManagerRatisServer.newOMRatisServer(
-          omStorage.getOmId(), configuration);
+          omId, omRpcAddress.getAddress(), configuration);
       omRatisServer.start();
 
       LOG.info("OzoneManager Ratis server started at port {}",
           omRatisServer.getServerPort());
+
+      omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
+          omId, omRatisServer.getRaftGroup(), configuration);
+      omRatisClient.connect();
     } else {
       omRatisServer = null;
+      omRatisClient = null;
     }
 
+    BlockingService omService = newReflectiveBlockingService(
+        new OzoneManagerProtocolServerSideTranslatorPB(
+            this, omRatisClient, omRatisEnabled));
+    omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
+        OzoneManagerProtocolPB.class, omService,
+        handlerCount);
+    omRpcAddress = updateRPCListenAddress(configuration,
+        OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
+    omRpcServer.start();
+
+    LOG.info(buildRpcServerStartMessage("OzoneManager RPC server",
+        omRpcAddress));
+
     DefaultMetricsSystem.initialize("OzoneManager");
 
     metadataManager.start(configuration);
@@ -687,6 +702,22 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
   }
 
+  /**
+   * Validates that the incoming OM request has required parameters.
+   * TODO: Add more validation checks before writing the request to Ratis log.
+   * @param omRequest client request to OM
+   * @throws OMException thrown if required parameters are set to null.
+   */
+  public void validateRequest(OMRequest omRequest) throws OMException {
+    Type cmdType = omRequest.getCmdType();
+    if (cmdType == null) {
+      throw new OMException("CmdType is null", ResultCodes.INVALID_REQUEST);
+    }
+    if (omRequest.getClientId() == null) {
+      throw new OMException("ClientId is null", ResultCodes.INVALID_REQUEST);
+    }
+  }
+
   /**
    * Creates a volume.
    *

+ 2 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java

@@ -118,6 +118,7 @@ public class OMException extends IOException {
     S3_BUCKET_NOT_FOUND,
     INITIATE_MULTIPART_UPLOAD_FAILED,
     NO_SUCH_MULTIPART_UPLOAD,
-    UPLOAD_PART_FAILED;
+    UPLOAD_PART_FAILED,
+    INVALID_REQUEST;
   }
 }

+ 121 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java

@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.ratis;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.SizeInBytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Ratis helper methods for OM Ratis server and client.
+ */
+public class OMRatisHelper {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      OMRatisHelper.class);
+
+  private OMRatisHelper() {
+  }
+
+  /**
+   * Creates a new RaftClient object.
+   * @param rpcType Replication Type
+   * @param omId OM id of the client
+   * @param group RaftGroup
+   * @param retryPolicy Retry policy
+   * @return RaftClient object
+   */
+  static RaftClient newRaftClient(RpcType rpcType, String omId, RaftGroup
+      group, RetryPolicy retryPolicy,   Configuration conf) {
+    LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, omId, group);
+    final RaftProperties properties = new RaftProperties();
+    RaftConfigKeys.Rpc.setType(properties, rpcType);
+
+    final int raftSegmentPreallocatedSize = (int) conf.getStorageSize(
+        OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
+        OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
+        StorageUnit.BYTES);
+    GrpcConfigKeys.setMessageSizeMax(
+        properties, SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+
+    return RaftClient.newBuilder()
+        .setRaftGroup(group)
+        .setLeaderId(getRaftPeerId(omId))
+        .setProperties(properties)
+        .setRetryPolicy(retryPolicy)
+        .build();
+  }
+
+  static RaftPeerId getRaftPeerId(String omId) {
+    return RaftPeerId.valueOf(omId);
+  }
+
+  static ByteString convertRequestToByteString(OMRequest request) {
+    byte[] requestBytes = request.toByteArray();
+    return ByteString.copyFrom(requestBytes);
+  }
+
+  static OMRequest convertByteStringToOMRequest(ByteString byteString)
+      throws InvalidProtocolBufferException {
+    byte[] bytes = byteString.toByteArray();
+    return OMRequest.parseFrom(bytes);
+  }
+
+  static ByteString convertResponseToByteString(OMResponse response) {
+    byte[] requestBytes = response.toByteArray();
+    return ByteString.copyFrom(requestBytes);
+  }
+
+  static OMResponse convertByteStringToOMResponse(ByteString byteString)
+      throws InvalidProtocolBufferException {
+    byte[] bytes = byteString.toByteArray();
+    return OMResponse.parseFrom(bytes);
+  }
+
+  static OMResponse getErrorResponse(Type cmdType, Exception e) {
+    return OMResponse.newBuilder()
+        .setCmdType(cmdType)
+        .setSuccess(false)
+        .setMessage(e.getMessage())
+        .build();
+  }
+
+  static <T> CompletableFuture<T> completeExceptionally(Exception e) {
+    final CompletableFuture<T> future = new CompletableFuture<>();
+    future.completeExceptionally(e);
+    return future;
+  }
+}

+ 194 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java

@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.ratis;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftRetryFailureException;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OM Ratis client to interact with OM Ratis server endpoint.
+ */
+public final class OzoneManagerRatisClient implements Closeable {
+  static final Logger LOG = LoggerFactory.getLogger(
+      OzoneManagerRatisClient.class);
+
+  private final RaftGroup raftGroup;
+  private final String omID;
+  private final RpcType rpcType;
+  private final AtomicReference<RaftClient> client = new AtomicReference<>();
+  private final RetryPolicy retryPolicy;
+  private final Configuration conf;
+
+  private OzoneManagerRatisClient(String omId, RaftGroup raftGroup,
+      RpcType rpcType, RetryPolicy retryPolicy,
+      Configuration config) {
+    this.raftGroup = raftGroup;
+    this.omID = omId;
+    this.rpcType = rpcType;
+    this.retryPolicy = retryPolicy;
+    this.conf = config;
+  }
+
+  public static OzoneManagerRatisClient newOzoneManagerRatisClient(
+      String omId, RaftGroup raftGroup, Configuration conf) {
+    final String rpcType = conf.get(
+        OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY,
+        OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT);
+
+    final int maxRetryCount = conf.getInt(
+        OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY,
+        OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT);
+    final long retryInterval = conf.getTimeDuration(
+        OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
+        OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT
+            .toIntExact(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+    final TimeDuration sleepDuration = TimeDuration.valueOf(
+        retryInterval, TimeUnit.MILLISECONDS);
+    final RetryPolicy retryPolicy = RetryPolicies
+        .retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration);
+
+    return new OzoneManagerRatisClient(omId, raftGroup,
+        SupportedRpcType.valueOfIgnoreCase(rpcType), retryPolicy, conf);
+  }
+
+  public void connect() {
+    LOG.debug("Connecting to OM Ratis Server GroupId:{} OM:{}",
+        raftGroup.getGroupId().getUuid().toString(), omID);
+
+    // TODO : XceiverClient ratis should pass the config value of
+    // maxOutstandingRequests so as to set the upper bound on max no of async
+    // requests to be handled by raft client
+
+    if (!client.compareAndSet(null, OMRatisHelper.newRaftClient(
+        rpcType, omID, raftGroup, retryPolicy, conf))) {
+      throw new IllegalStateException("Client is already connected.");
+    }
+  }
+
+  @Override
+  public void close() {
+    final RaftClient c = client.getAndSet(null);
+    if (c != null) {
+      closeRaftClient(c);
+    }
+  }
+
+  private void closeRaftClient(RaftClient raftClient) {
+    try {
+      raftClient.close();
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private RaftClient getClient() {
+    return Objects.requireNonNull(client.get(), "client is null");
+  }
+
+  /**
+   * Sends a given request to server and gets the reply back.
+   * @param request Request
+   * @return Response to the command
+   */
+  public OMResponse sendCommand(OMRequest request) {
+    try {
+      CompletableFuture<OMResponse> reply = sendCommandAsync(request);
+      return reply.get();
+    } catch (ExecutionException | InterruptedException e) {
+      LOG.error("Failed to execute command: " + request, e);
+      return OMRatisHelper.getErrorResponse(request.getCmdType(), e);
+    }
+  }
+
+  /**
+   * Sends a given command to server gets a waitable future back.
+   *
+   * @param request Request
+   * @return Response to the command
+   */
+  private CompletableFuture<OMResponse> sendCommandAsync(OMRequest request) {
+    CompletableFuture<RaftClientReply> raftClientReply =
+        sendRequestAsync(request);
+
+    CompletableFuture<OMResponse> omRatisResponse =
+        raftClientReply.whenComplete((reply, e) -> LOG.debug(
+            "received reply {} for request: cmdType={} traceID={} " +
+                "exception: {}", reply, request.getCmdType(),
+            request.getTraceID(), e))
+            .thenApply(reply -> {
+              try {
+                // we need to handle RaftRetryFailure Exception
+                RaftRetryFailureException raftRetryFailureException =
+                    reply.getRetryFailureException();
+                if (raftRetryFailureException != null) {
+                  throw new CompletionException(raftRetryFailureException);
+                }
+                OMResponse response = OMRatisHelper
+                    .convertByteStringToOMResponse(reply.getMessage()
+                        .getContent());
+                return response;
+              } catch (InvalidProtocolBufferException e) {
+                throw new CompletionException(e);
+              }
+            });
+    return omRatisResponse;
+  }
+
+  /**
+   * Submits {@link RaftClient#sendReadOnlyAsync(Message)} request to Ratis
+   * server if the request is readOnly. Otherwise, submits
+   * {@link RaftClient#sendAsync(Message)} request.
+   * @param request OMRequest
+   * @return RaftClient response
+   */
+  private CompletableFuture<RaftClientReply> sendRequestAsync(
+      OMRequest request) {
+    boolean isReadOnlyRequest = OmUtils.isReadOnly(request);
+    ByteString byteString = OMRatisHelper.convertRequestToByteString(request);
+    LOG.debug("sendOMRequestAsync {} {}", isReadOnlyRequest, request);
+    return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) :
+        getClient().sendAsync(() -> byteString);
+  }
+}

+ 60 - 19
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java

@@ -22,12 +22,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
@@ -38,7 +40,9 @@ import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
@@ -59,23 +63,43 @@ public final class OzoneManagerRatisServer {
       .getLogger(OzoneManagerRatisServer.class);
 
   private final int port;
+  private final InetSocketAddress omRatisAddress;
   private final RaftServer server;
+  private final RaftGroupId raftGroupId;
+  private final RaftGroup raftGroup;
+  private final RaftPeerId raftPeerId;
 
-  private OzoneManagerRatisServer(String omId, int port, Configuration conf)
-      throws IOException {
+  private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
+
+  private static long nextCallId() {
+    return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
+  }
+
+  private OzoneManagerRatisServer(String omId, InetAddress addr, int port,
+      Configuration conf) throws IOException {
     Objects.requireNonNull(omId, "omId == null");
     this.port = port;
+    this.omRatisAddress = new InetSocketAddress(addr.getHostAddress(), port);
     RaftProperties serverProperties = newRaftProperties(conf);
 
+    // TODO: When implementing replicated OM ratis servers, RaftGroupID
+    // should be the same across all the OMs. Add all the OM servers as Raft
+    // Peers.
+    this.raftGroupId = RaftGroupId.randomId();
+    this.raftPeerId = RaftPeerId.getRaftPeerId(omId);
+
+    RaftPeer raftPeer = new RaftPeer(raftPeerId, omRatisAddress);
+    this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeer);
     this.server = RaftServer.newBuilder()
-        .setServerId(RaftPeerId.valueOf(omId))
+        .setServerId(this.raftPeerId)
+        .setGroup(this.raftGroup)
         .setProperties(serverProperties)
-        .setStateMachineRegistry(this::getStateMachine)
+        .setStateMachine(getStateMachine(this.raftGroupId))
         .build();
   }
 
   public static OzoneManagerRatisServer newOMRatisServer(String omId,
-      Configuration ozoneConf) throws IOException {
+      InetAddress omAddress, Configuration ozoneConf) throws IOException {
     int localPort = ozoneConf.getInt(
         OMConfigKeys.OZONE_OM_RATIS_PORT_KEY,
         OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT);
@@ -96,7 +120,11 @@ public final class OzoneManagerRatisServer {
             + "fallback to use default port {}", localPort, e);
       }
     }
-    return new OzoneManagerRatisServer(omId, localPort, ozoneConf);
+    return new OzoneManagerRatisServer(omId, omAddress, localPort, ozoneConf);
+  }
+
+  public RaftGroup getRaftGroup() {
+    return this.raftGroup;
   }
 
   /**
@@ -104,7 +132,7 @@ public final class OzoneManagerRatisServer {
    * TODO: Implement a state machine on OM.
    */
   private BaseStateMachine getStateMachine(RaftGroupId gid) {
-    return new BaseStateMachine();
+    return  new OzoneManagerStateMachine(null);
   }
 
   public void start() throws IOException {
@@ -163,8 +191,8 @@ public final class OzoneManagerRatisServer {
         OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
         OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
         StorageUnit.BYTES);
-    RaftServerConfigKeys.Log.Appender
-        .setBufferElementLimit(properties, logAppenderQueueNumElements);
+    RaftServerConfigKeys.Log.Appender.setBufferElementLimit(properties,
+        logAppenderQueueNumElements);
     RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
         SizeInBytes.valueOf(logAppenderQueueByteLimit));
     RaftServerConfigKeys.Log.setPreallocatedSize(properties,
@@ -197,8 +225,8 @@ public final class OzoneManagerRatisServer {
             .getDuration(), retryCacheTimeoutUnit);
     final TimeDuration retryCacheTimeout = TimeDuration.valueOf(
         retryCacheTimeoutDuration, retryCacheTimeoutUnit);
-    RaftServerConfigKeys.RetryCache
-        .setExpiryTime(properties, retryCacheTimeout);
+    RaftServerConfigKeys.RetryCache.setExpiryTime(properties,
+        retryCacheTimeout);
 
     // Set the server min and max timeout
     TimeUnit serverMinTimeoutUnit =
@@ -222,11 +250,11 @@ public final class OzoneManagerRatisServer {
     RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
 
     // Set the client request timeout
-    TimeUnit clientRequestTimeoutUnit =
-        OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DEFAULT.getUnit();
+    TimeUnit clientRequestTimeoutUnit = OMConfigKeys
+        .OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT .getUnit();
     long clientRequestTimeoutDuration = conf.getTimeDuration(
-        OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_KEY,
-        OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DEFAULT
+        OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
+        OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
             .getDuration(), clientRequestTimeoutUnit);
     final TimeDuration clientRequestTimeout = TimeDuration.valueOf(
         clientRequestTimeoutDuration, clientRequestTimeoutUnit);
@@ -243,10 +271,24 @@ public final class OzoneManagerRatisServer {
     /**
      * TODO: set following ratis leader election related configs when
      * replicated ratis server is implemented.
-     * 1. leader election timeout
-     * 2. node failure timeout
-     * 3.
+     * 1. node failure timeout
      */
+    // Set the ratis leader election timeout
+    TimeUnit leaderElectionMinTimeoutUnit =
+        OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
+            .getUnit();
+    long leaderElectionMinTimeoutduration = conf.getTimeDuration(
+        OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+        OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
+            .getDuration(), leaderElectionMinTimeoutUnit);
+    final TimeDuration leaderElectionMinTimeout = TimeDuration.valueOf(
+        leaderElectionMinTimeoutduration, leaderElectionMinTimeoutUnit);
+    RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
+        leaderElectionMinTimeout);
+    long leaderElectionMaxTimeout = leaderElectionMinTimeout.toLong(
+        TimeUnit.MILLISECONDS) + 200;
+    RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
+        TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
 
     /**
      * TODO: when ratis snapshots are implemented, set snapshot threshold and
@@ -276,5 +318,4 @@ public final class OzoneManagerRatisServer {
     }
     return storageDir;
   }
-
 }

+ 90 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java

@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.ratis;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis
+    .ContainerStateMachine;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.*;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The OM StateMachine is the state machine for OM Ratis server. It is
+ * responsible for applying ratis committed transactions to
+ * {@link OzoneManager}.
+ */
+public class OzoneManagerStateMachine extends BaseStateMachine {
+
+  static final Logger LOG =
+      LoggerFactory.getLogger(ContainerStateMachine.class);
+  private final SimpleStateMachineStorage storage =
+      new SimpleStateMachineStorage();
+  private final OzoneManager ozoneManager;
+
+  public OzoneManagerStateMachine(OzoneManager om) {
+    // OzoneManager is required when implementing StateMachine
+    this.ozoneManager = om;
+  }
+
+  /**
+   * Initializes the State Machine with the given server, group and storage.
+   * TODO: Load the latest snapshot from the file system.
+   */
+  @Override
+  public void initialize(
+      RaftServer server, RaftGroupId id, RaftStorage raftStorage)
+      throws IOException {
+    super.initialize(server, id, raftStorage);
+    storage.init(raftStorage);
+  }
+
+  /*
+   * Apply a committed log entry to the state machine. This function
+   * currently returns a dummy message.
+   * TODO: Apply transaction to OM state machine
+   */
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    String errorMessage;
+    ByteString logData = trx.getStateMachineLogEntry().getLogData();
+    try {
+      OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(logData);
+      LOG.debug("Received request: cmdType={} traceID={} ",
+          omRequest.getCmdType(), omRequest.getTraceID());
+      errorMessage = "Dummy response from Ratis server for command type: " +
+          omRequest.getCmdType();
+    } catch (InvalidProtocolBufferException e) {
+      errorMessage = e.getMessage();
+    }
+
+    // TODO: When State Machine is implemented, send the actual response back
+    return OMRatisHelper.completeExceptionally(new IOException(errorMessage));
+  }
+}

+ 27 - 3
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .AllocateBlockRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -160,6 +161,8 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
   private static final Logger LOG = LoggerFactory
       .getLogger(OzoneManagerProtocolServerSideTranslatorPB.class);
   private final OzoneManagerProtocol impl;
+  private final OzoneManagerRatisClient omRatisClient;
+  private final boolean isRatisEnabled;
 
   /**
    * Constructs an instance of the server handler.
@@ -167,8 +170,11 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
    * @param impl OzoneManagerProtocolPB
    */
   public OzoneManagerProtocolServerSideTranslatorPB(
-      OzoneManagerProtocol impl) {
+      OzoneManagerProtocol impl, OzoneManagerRatisClient ratisClient,
+      boolean enableRatis) {
     this.impl = impl;
+    this.omRatisClient = ratisClient;
+    this.isRatisEnabled = enableRatis;
   }
 
   /**
@@ -179,10 +185,29 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
   @Override
    public OMResponse submitRequest(RpcController controller,
       OMRequest request) throws ServiceException {
-    Type cmdType = request.getCmdType();
+    if (isRatisEnabled) {
+      return submitRequestToRatis(request);
+    } else {
+      return submitRequestToOM(request);
+    }
+  }
+
+  /**
+   * Submits request to OM's Ratis server.
+   */
+  private OMResponse submitRequestToRatis(OMRequest request) {
+    return omRatisClient.sendCommand(request);
+  }
 
+  /**
+   * Submits request directly to OM.
+   */
+  private OMResponse submitRequestToOM(OMRequest request)
+      throws ServiceException {
+    Type cmdType = request.getCmdType();
     OMResponse.Builder responseBuilder = OMResponse.newBuilder()
         .setCmdType(cmdType);
+
     switch (cmdType) {
     case CreateVolume:
       CreateVolumeResponse createVolumeResponse = createVolume(
@@ -318,7 +343,6 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
     }
     return responseBuilder.build();
   }
-
   // Convert and exception to corresponding status code
   private Status exceptionToResponseStatus(IOException ex) {
     if (ex instanceof OMException) {

+ 81 - 5
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java

@@ -18,35 +18,58 @@
 
 package org.apache.hadoop.ozone.om.ratis;
 
+import java.net.InetAddress;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertFalse;
 
 /**
  * Test OM Ratis server.
  */
 public class TestOzoneManagerRatisServer {
 
-  private Configuration conf;
+  private OzoneConfiguration conf;
   private OzoneManagerRatisServer omRatisServer;
+  private OzoneManagerRatisClient omRatisClient;
   private String omID;
+  private String clientId = UUID.randomUUID().toString();
+  private static final long LEADER_ELECTION_TIMEOUT = 500L;
 
   @Before
-  public void init() {
+  public void init() throws Exception {
     conf = new OzoneConfiguration();
     omID = UUID.randomUUID().toString();
     final String path = GenericTestUtils.getTempPath(omID);
     Path metaDirPath = Paths.get(path, "om-meta");
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString());
+    conf.setTimeDuration(
+        OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+        LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+    omRatisServer = OzoneManagerRatisServer.newOMRatisServer(omID,
+        InetAddress.getLocalHost(), conf);
+    omRatisServer.start();
+    omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(omID,
+        omRatisServer.getRaftGroup(), conf);
+    omRatisClient.connect();
   }
 
   @After
@@ -54,6 +77,9 @@ public class TestOzoneManagerRatisServer {
     if (omRatisServer != null) {
       omRatisServer.stop();
     }
+    if (omRatisClient != null) {
+      omRatisClient.close();
+    }
   }
 
   /**
@@ -61,9 +87,59 @@ public class TestOzoneManagerRatisServer {
    */
   @Test
   public void testStartOMRatisServer() throws Exception {
-    omRatisServer = OzoneManagerRatisServer.newOMRatisServer(omID, conf);
-    omRatisServer.start();
     Assert.assertEquals("Ratis Server should be in running state",
         LifeCycle.State.RUNNING, omRatisServer.getServerState());
   }
+
+  /**
+   * Submit any request to OM Ratis server and check that the dummy response
+   * message is received.
+   * TODO: Once state machine is implemented, submitting a request to Ratis
+   * server should result in a valid response.
+   */
+  @Test
+  public void testSubmitRatisRequest() throws Exception {
+    // Wait for leader election
+    Thread.sleep(LEADER_ELECTION_TIMEOUT * 2);
+
+    OMRequest request = OMRequest.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
+        .setClientId(clientId)
+        .build();
+
+    OMResponse response = omRatisClient.sendCommand(request);
+
+    // Since the state machine is not implemented yet, we should get the
+    // configured dummy message from Ratis.
+    Assert.assertEquals(false, response.getSuccess());
+    Assert.assertTrue(response.getMessage().contains("Dummy response from " +
+        "Ratis server for command type: " +
+        OzoneManagerProtocolProtos.Type.CreateVolume));
+    Assert.assertEquals(false, response.hasCreateVolumeResponse());
+  }
+
+  /**
+   * Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are
+   * categorized in {@link OmUtils#isReadOnly(OMRequest)}.
+   */
+  @Test
+  public void testIsReadOnlyCapturesAllCmdTypeEnums() throws Exception {
+    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+        .captureLogs(LoggerFactory.getLogger(OmUtils.class));
+    String clientId = UUID.randomUUID().toString();
+    OzoneManagerProtocolProtos.Type[] cmdTypes =
+        OzoneManagerProtocolProtos.Type.values();
+
+    for (OzoneManagerProtocolProtos.Type cmdtype : cmdTypes) {
+      OMRequest request = OMRequest.newBuilder()
+          .setCmdType(cmdtype)
+          .setClientId(clientId)
+          .build();
+      OmUtils.isReadOnly(request);
+      assertFalse(cmdtype + "is not categorized in OmUtils#isReadyOnly",
+          logCapturer.getOutput().contains("CmdType " + cmdtype +" is not " +
+              "categorized as readOnly or not."));
+      logCapturer.clearOutput();
+    }
+  }
 }