Browse Source

HDDS-972. Add support for configuring multiple OMs. Contributed by Hanisha Koneru.

Bharat Viswanadham 6 years ago
parent
commit
917ac9f108
17 changed files with 1427 additions and 157 deletions
  1. 3 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
  2. 75 31
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  3. 65 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
  4. 40 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneIllegalArgumentException.java
  5. 12 5
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
  6. 23 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
  7. 22 8
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
  8. 261 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
  9. 2 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
  10. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java
  11. 290 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java
  12. 156 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
  13. 107 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMNodeDetails.java
  14. 252 33
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
  15. 11 24
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
  16. 88 51
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
  17. 19 4
      hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java

+ 3 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java

@@ -268,4 +268,7 @@ public final class OzoneConsts {
       Metadata.Key.of(OZONE_BLOCK_TOKEN, ASCII_STRING_MARSHALLER);
   public static final Metadata.Key<String> USER_METADATA_KEY =
       Metadata.Key.of(OZONE_USER, ASCII_STRING_MARSHALLER);
+
+  // Default OMServiceID for OM Ratis servers to use as RaftGroupId
+  public static final String OM_SERVICE_ID_DEFAULT = "om-service-value";
 }

+ 75 - 31
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -454,6 +454,48 @@
       and DataNode.
     </description>
   </property>
+  <property>
+    <name>ozone.om.service.ids</name>
+    <value></value>
+    <tag>OM, HA</tag>
+    <description>
+      Comma-separated list of OM service Ids.
+
+      If not set, the default value of "om-service-value" is assigned as the
+      OM service ID.
+    </description>
+  </property>
+  <property>
+    <name>ozone.om.nodes.EXAMPLEOMSERVICEID</name>
+    <value></value>
+    <tag>OM, HA</tag>
+    <description>
+      Comma-separated list of OM node Ids for a given OM service ID (eg.
+      EXAMPLEOMSERVICEID). The OM service ID should be the value (one of the
+      values if there are multiple) set for the parameter ozone.om.service.ids.
+
+      Unique identifiers for each OM Node, delimited by commas. This will be
+      used by OzoneManagers in HA setup to determine all the OzoneManagers
+      belonging to the same OMservice in the cluster. For example, if you
+      used “omService1” as the OM service ID previously, and you wanted to
+      use “om1”, “om2” and "om3" as the individual IDs of the OzoneManagers,
+      you would configure a property ozone.om.nodes.omService1, and its value
+      "om1,om2,om3".
+    </description>
+  </property>
+  <property>
+    <name>ozone.om.node.id</name>
+    <value></value>
+    <tag>OM, HA</tag>
+    <description>
+      The ID of this OM node. If the OM node ID is not configured it
+      is determined automatically by matching the local node's address
+      with the configured address.
+
+      If node ID is not deterministic from the configuration, then it is set
+      to the OmId from the OM version file.
+    </description>
+  </property>
   <property>
     <name>ozone.om.address</name>
     <value/>
@@ -1452,15 +1494,6 @@
     </description>
   </property>
 
-  <property>
-    <name>ozone.om.ratis.random.port</name>
-    <value>false</value>
-    <tag>OZONE, OM, RATIS, DEBUG</tag>
-    <description>Allocates a random free port for OM's Ratis server. This is
-      used only while running unit tests.
-    </description>
-  </property>
-
   <property>
     <name>ozone.om.ratis.rpc.type</name>
     <value>GRPC</value>
@@ -1543,28 +1576,39 @@
     <description>The timeout duration for OM Ratis client request.
     </description>
   </property>
-	<property>
-		<name>ozone.om.ratis.client.request.max.retries</name>
-		<value>180</value>
-		<tag>OZONE, OM, RATIS, MANAGEMENT</tag>
-		<description>Number of retries for OM client request.</description>
-	</property>
-	<property>
-		<name>ozone.om.ratis.client.request.retry.interval</name>
-		<value>100ms</value>
-		<tag>OZONE, OM, RATIS, MANAGEMENT</tag>
-		<description>Interval between successive retries for a OM client request.
-		</description>
-	</property>
-
-	<property>
-		<name>ozone.om.leader.election.minimum.timeout.duration</name>
-		<value>1s</value>
-		<tag>OZONE, OM, RATIS, MANAGEMENT</tag>
-		<description>The minimum timeout duration for OM ratis leader election.
-			Default is 1s.
-		</description>
-	</property>
+  <property>
+    <name>ozone.om.ratis.client.request.max.retries</name>
+    <value>180</value>
+    <tag>OZONE, OM, RATIS, MANAGEMENT</tag>
+    <description>Number of retries for OM client request.</description>
+  </property>
+  <property>
+    <name>ozone.om.ratis.client.request.retry.interval</name>
+    <value>100ms</value>
+    <tag>OZONE, OM, RATIS, MANAGEMENT</tag>
+    <description>Interval between successive retries for a OM client request.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.om.leader.election.minimum.timeout.duration</name>
+    <value>1s</value>
+    <tag>OZONE, OM, RATIS, MANAGEMENT</tag>
+    <description>The minimum timeout duration for OM ratis leader election.
+      Default is 1s.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.om.ratis.server.failure.timeout.duration</name>
+    <value>120s</value>
+    <tag>OZONE, OM, RATIS, MANAGEMENT</tag>
+    <description>The timeout duration for ratis server failure detection,
+      once the threshold has reached, the ratis state machine will be informed
+      about the failure in the ratis ring.
+    </description>
+  </property>
+
 
   <property>
     <name>ozone.acl.authorizer.class</name>

+ 65 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java

@@ -17,12 +17,15 @@
 
 package org.apache.hadoop.ozone;
 
+import com.google.common.base.Joiner;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Optional;
 
 import org.apache.commons.lang3.RandomStringUtils;
@@ -38,7 +41,9 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_BIND_HOST_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_BIND_PORT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_PORT_DEFAULT;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -191,4 +196,64 @@ public final class OmUtils {
           "This could possibly indicate a faulty JRE");
     }
   }
+
+  /**
+   * Add non empty and non null suffix to a key.
+   */
+  private static String addSuffix(String key, String suffix) {
+    if (suffix == null || suffix.isEmpty()) {
+      return key;
+    }
+    assert !suffix.startsWith(".") :
+        "suffix '" + suffix + "' should not already have '.' prepended.";
+    return key + "." + suffix;
+  }
+
+  /**
+   * Concatenate list of suffix strings '.' separated.
+   */
+  private static String concatSuffixes(String... suffixes) {
+    if (suffixes == null) {
+      return null;
+    }
+    return Joiner.on(".").skipNulls().join(suffixes);
+  }
+
+  /**
+   * Return configuration key of format key.suffix1.suffix2...suffixN.
+   */
+  public static String addKeySuffixes(String key, String... suffixes) {
+    String keySuffix = concatSuffixes(suffixes);
+    return addSuffix(key, keySuffix);
+  }
+
+  /**
+   * Match input address to local address.
+   * Return true if it matches, false otherwsie.
+   */
+  public static boolean isAddressLocal(InetSocketAddress addr) {
+    return NetUtils.isLocalAddress(addr.getAddress());
+  }
+
+  /**
+   * Get a collection of all omNodeIds for the given omServiceId.
+   */
+  public static Collection<String> getOMNodeIds(Configuration conf,
+      String omServiceId) {
+    String key = addSuffix(OZONE_OM_NODES_KEY, omServiceId);
+    return conf.getTrimmedStringCollection(key);
+  }
+
+  /**
+   * @return <code>coll</code> if it is non-null and non-empty. Otherwise,
+   * returns a list with a single null value.
+   */
+  public static Collection<String> emptyAsSingletonNull(Collection<String>
+      coll) {
+    if (coll == null || coll.isEmpty()) {
+      return Collections.singletonList(null);
+    } else {
+      return coll;
+    }
+  }
 }

+ 40 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OzoneIllegalArgumentException.java

@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Indicates that a method has been passed illegal or invalid argument. This
+ * exception is thrown instead of IllegalArgumentException to differentiate the
+ * exception thrown in Hadoop implementation from the one thrown in JDK.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class OzoneIllegalArgumentException extends IllegalArgumentException {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * Constructs exception with the specified detail message.
+   * @param message detailed message.
+   */
+  public OzoneIllegalArgumentException(final String message) {
+    super(message);
+  }
+}

+ 12 - 5
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java

@@ -40,6 +40,13 @@ public final class OMConfigKeys {
       "ozone.om.handler.count.key";
   public static final int OZONE_OM_HANDLER_COUNT_DEFAULT = 20;
 
+  public static final String OZONE_OM_SERVICE_IDS_KEY =
+      "ozone.om.service.ids";
+  public static final String OZONE_OM_NODES_KEY =
+      "ozone.om.nodes";
+  public static final String OZONE_OM_NODE_ID_KEY =
+      "ozone.om.node.id";
+
   public static final String OZONE_OM_ADDRESS_KEY =
       "ozone.om.address";
   public static final String OZONE_OM_BIND_HOST_DEFAULT =
@@ -101,11 +108,6 @@ public final class OMConfigKeys {
       = "ozone.om.ratis.port";
   public static final int OZONE_OM_RATIS_PORT_DEFAULT
       = 9872;
-  // When set to true, allocate a random free port for ozone ratis server
-  public static final String OZONE_OM_RATIS_RANDOM_PORT_KEY =
-      "ozone.om.ratis.random.port";
-  public static final boolean OZONE_OM_RATIS_RANDOM_PORT_KEY_DEFAULT
-      = false;
   public static final String OZONE_OM_RATIS_RPC_TYPE_KEY
       = "ozone.om.ratis.rpc.type";
   public static final String OZONE_OM_RATIS_RPC_TYPE_DEFAULT
@@ -175,6 +177,11 @@ public final class OMConfigKeys {
   public static final TimeDuration
       OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT =
       TimeDuration.valueOf(1, TimeUnit.SECONDS);
+  public static final String OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY
+      = "ozone.om.ratis.server.failure.timeout.duration";
+  public static final TimeDuration
+      OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
+      = TimeDuration.valueOf(120, TimeUnit.SECONDS);
 
   public static final String OZONE_OM_KERBEROS_KEYTAB_FILE_KEY = "ozone.om."
       + "kerberos.keytab.file";

+ 23 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java

@@ -51,6 +51,17 @@ public interface MiniOzoneCluster {
     return new MiniOzoneClusterImpl.Builder(conf);
   }
 
+  /**
+   * Returns the Builder to construct MiniOzoneHACluster.
+   *
+   * @param conf OzoneConfiguration
+   *
+   * @return MiniOzoneCluster builder
+   */
+  static Builder newHABuilder(OzoneConfiguration conf) {
+    return new MiniOzoneHAClusterImpl.Builder(conf);
+  }
+
   /**
    * Returns the configuration object associated with the MiniOzoneCluster.
    *
@@ -223,6 +234,8 @@ public interface MiniOzoneCluster {
     protected final String path;
 
     protected String clusterId;
+    protected String omServiceId;
+    protected int numOfOMs;
 
     protected Optional<Boolean> enableTrace = Optional.of(false);
     protected Optional<Integer> hbInterval = Optional.empty();
@@ -416,6 +429,16 @@ public interface MiniOzoneCluster {
       return this;
     }
 
+    public Builder setNumOfOzoneManagers(int numOMs) {
+      this.numOfOMs = numOMs;
+      return this;
+    }
+
+    public Builder setOMServiceId(String serviceId) {
+      this.omServiceId = serviceId;
+      return this;
+    }
+
     /**
      * Constructs and returns MiniOzoneCluster.
      *

+ 22 - 8
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java

@@ -83,14 +83,14 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
  * StorageContainerManager and multiple DataNodes.
  */
 @InterfaceAudience.Private
-public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
+public class MiniOzoneClusterImpl implements MiniOzoneCluster {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(MiniOzoneClusterImpl.class);
 
   private final OzoneConfiguration conf;
   private StorageContainerManager scm;
-  private final OzoneManager ozoneManager;
+  private OzoneManager ozoneManager;
   private final List<HddsDatanodeService> hddsDatanodes;
 
   // Timeout for the cluster to be ready
@@ -101,7 +101,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
    *
    * @throws IOException if there is an I/O error
    */
-  private MiniOzoneClusterImpl(OzoneConfiguration conf,
+  MiniOzoneClusterImpl(OzoneConfiguration conf,
                                OzoneManager ozoneManager,
                                StorageContainerManager scm,
                                List<HddsDatanodeService> hddsDatanodes) {
@@ -111,6 +111,20 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
     this.hddsDatanodes = hddsDatanodes;
   }
 
+  /**
+   * Creates a new MiniOzoneCluster without the OzoneManager. This is used by
+   * {@link MiniOzoneHAClusterImpl} for starting multiple OzoneManagers.
+   * @param conf
+   * @param scm
+   * @param hddsDatanodes
+   */
+  MiniOzoneClusterImpl(OzoneConfiguration conf, StorageContainerManager scm,
+      List<HddsDatanodeService> hddsDatanodes) {
+    this.conf = conf;
+    this.scm = scm;
+    this.hddsDatanodes = hddsDatanodes;
+  }
+
   public OzoneConfiguration getConf() {
     return conf;
   }
@@ -394,11 +408,11 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
     }
 
     /**
-     * Initializes the configureation required for starting MiniOzoneCluster.
+     * Initializes the configuration required for starting MiniOzoneCluster.
      *
      * @throws IOException
      */
-    private void initializeConfiguration() throws IOException {
+    void initializeConfiguration() throws IOException {
       conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, ozoneEnabled);
       Path metaDir = Paths.get(path, "ozone-meta");
       Files.createDirectories(metaDir);
@@ -434,7 +448,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
      *
      * @throws IOException
      */
-    private StorageContainerManager createSCM()
+    StorageContainerManager createSCM()
         throws IOException, AuthenticationException {
       configureSCM();
       SCMStorageConfig scmStore = new SCMStorageConfig(conf);
@@ -455,7 +469,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
       scmStore.initialize();
     }
 
-    private void initializeOmStorage(OMStorage omStorage) throws IOException{
+    void initializeOmStorage(OMStorage omStorage) throws IOException{
       if (omStorage.getState() == StorageState.INITIALIZED) {
         return;
       }
@@ -487,7 +501,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
      *
      * @throws IOException
      */
-    private List<HddsDatanodeService> createHddsDatanodes(
+    List<HddsDatanodeService> createHddsDatanodes(
         StorageContainerManager scm) throws IOException {
       configureHddsDatanodes();
       String scmAddress =  scm.getDatanodeRpcAddress().getHostString() +

+ 261 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java

@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMStorage;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+
+/**
+ * MiniOzoneHAClusterImpl creates a complete in-process Ozone cluster
+ * with OM HA suitable for running tests.  The cluster consists of a set of
+ * OzoneManagers, StorageContainerManager and multiple DataNodes.
+ */
+public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class);
+
+  private List<OzoneManager> ozoneManagers;
+
+  private static final Random RANDOM = new Random();
+  private static final int RATIS_LEADER_ELECTION_TIMEOUT = 1000; // 1 seconds
+
+  public static final int NODE_FAILURE_TIMEOUT = 2000; // 2 seconds
+
+  /**
+   * Creates a new MiniOzoneCluster with OM HA.
+   *
+   * @throws IOException if there is an I/O error
+   */
+
+  private MiniOzoneHAClusterImpl(
+      OzoneConfiguration conf,
+      List<OzoneManager> omList,
+      StorageContainerManager scm,
+      List<HddsDatanodeService> hddsDatanodes) {
+    super(conf, scm, hddsDatanodes);
+    this.ozoneManagers = omList;
+  }
+
+  /**
+   * Returns the first OzoneManager from the list.
+   * @return
+   */
+  @Override
+  public OzoneManager getOzoneManager() {
+    return this.ozoneManagers.get(0);
+  }
+
+  public OzoneManager getOzoneManager(int index) {
+    return this.ozoneManagers.get(index);
+  }
+
+  @Override
+  public void restartOzoneManager() throws IOException {
+    for (OzoneManager ozoneManager : ozoneManagers) {
+      ozoneManager.stop();
+      ozoneManager.restart();
+    }
+  }
+
+  @Override
+  public void stop() {
+    for (OzoneManager ozoneManager : ozoneManagers) {
+      if (ozoneManager != null) {
+        LOG.info("Stopping the OzoneManager " + ozoneManager.getOMNodId());
+        ozoneManager.stop();
+        ozoneManager.join();
+      }
+    }
+    super.stop();
+  }
+
+  public void stopOzoneManager(int index) {
+    ozoneManagers.get(index).stop();
+  }
+
+  /**
+   * Builder for configuring the MiniOzoneCluster to run.
+   */
+  public static class Builder extends MiniOzoneClusterImpl.Builder {
+
+    private final String nodeIdBaseStr = "omNode-";
+
+    /**
+     * Creates a new Builder.
+     *
+     * @param conf configuration
+     */
+    public Builder(OzoneConfiguration conf) {
+      super(conf);
+    }
+
+    @Override
+    public MiniOzoneCluster build() throws IOException {
+      DefaultMetricsSystem.setMiniClusterMode(true);
+      initializeConfiguration();
+      StorageContainerManager scm;
+      List<OzoneManager> omList;
+      try {
+        scm = createSCM();
+        scm.start();
+        omList = createOMService();
+      } catch (AuthenticationException ex) {
+        throw new IOException("Unable to build MiniOzoneCluster. ", ex);
+      }
+
+      final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
+      MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omList,
+          scm, hddsDatanodes);
+      if (startDataNodes) {
+        cluster.startHddsDatanodes();
+      }
+      return cluster;
+    }
+
+    /**
+     * Initialize OM configurations.
+     * @throws IOException
+     */
+    @Override
+    void initializeConfiguration() throws IOException {
+      super.initializeConfiguration();
+      conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
+      conf.setInt(OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY, numOfOmHandlers);
+      conf.setTimeDuration(
+          OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+          RATIS_LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+      conf.setTimeDuration(
+          OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
+          NODE_FAILURE_TIMEOUT, TimeUnit.MILLISECONDS);
+      conf.setInt(OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY,
+          10);
+    }
+
+    /**
+     * Start OM service with multiple OMs.
+     * @return list of OzoneManagers
+     * @throws IOException
+     * @throws AuthenticationException
+     */
+    private List<OzoneManager> createOMService() throws IOException,
+        AuthenticationException {
+
+      List<OzoneManager> omList = new ArrayList<>(numOfOMs);
+
+      int retryCount = 0;
+      int basePort = 10000;
+
+      while (true) {
+        try {
+          basePort = 10000 + RANDOM.nextInt(1000) * 4;
+          initHAConfig(basePort);
+
+          for (int i = 1; i<= numOfOMs; i++) {
+            // Set nodeId
+            conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeIdBaseStr + i);
+
+            // Set metadata/DB dir base path
+            String metaDirPath = path + "/" + nodeIdBaseStr + i;
+            conf.set(OZONE_METADATA_DIRS, metaDirPath);
+            OMStorage omStore = new OMStorage(conf);
+            initializeOmStorage(omStore);
+
+            // Set HTTP address to the rpc port + 2
+            int httpPort = basePort + (6*i) - 4;
+            conf.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY,
+                "127.0.0.1:" + httpPort);
+
+            OzoneManager om = OzoneManager.createOm(null, conf);
+            om.setCertClient(certClient);
+            omList.add(om);
+
+            om.start();
+            LOG.info("Started OzoneManager RPC server at " +
+                om.getOmRpcServerAddr());
+          }
+
+          // Set default OM address to point to the first OM. Clients would
+          // try connecting to this address by default
+          conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
+              NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr()));
+
+          break;
+        } catch (BindException e) {
+          for (OzoneManager om : omList) {
+            om.stop();
+            om.join();
+            LOG.info("Stopping OzoneManager server at " +
+                om.getOmRpcServerAddr());
+          }
+          omList.clear();
+          ++retryCount;
+          LOG.info("MiniOzoneHACluster port conflicts, retried " +
+              retryCount + " times");
+        }
+      }
+      return omList;
+    }
+
+    /**
+     * Initialize HA related configurations.
+     */
+    private void initHAConfig(int basePort) throws IOException {
+      // Set configurations required for starting OM HA service
+      conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServiceId);
+      String omNodesKey = OmUtils.addKeySuffixes(
+          OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId);
+      StringBuilder omNodesKeyValue = new StringBuilder();
+
+      int port = basePort;
+
+      for (int i = 1; i <= numOfOMs; i++, port+=6) {
+        String omNodeId = nodeIdBaseStr + i;
+        omNodesKeyValue.append(",").append(omNodeId);
+        String omAddrKey = OmUtils.addKeySuffixes(
+            OMConfigKeys.OZONE_OM_ADDRESS_KEY, omServiceId, omNodeId);
+        String omRatisPortKey = OmUtils.addKeySuffixes(
+            OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, omServiceId, omNodeId);
+
+        conf.set(omAddrKey, "127.0.0.1:" + port);
+        // Reserve port+2 for OMs HTTP server
+        conf.setInt(omRatisPortKey, port + 4);
+      }
+
+      conf.set(omNodesKey, omNodesKeyValue.substring(1));
+    }
+  }
+}

+ 2 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java

@@ -38,6 +38,7 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase {
     errorIfMissingConfigProps = true;
     errorIfMissingXmlProps = true;
     xmlPropsToSkipCompare.add("hadoop.tags.custom");
+    xmlPropsToSkipCompare.add("ozone.om.nodes.EXAMPLEOMSERVICEID");
     addPropertiesNotInXml();
   }
 
@@ -45,5 +46,6 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase {
     configurationPropsToSkipCompare.add(HddsConfigKeys.HDDS_KEY_ALGORITHM);
     configurationPropsToSkipCompare.add(HddsConfigKeys.HDDS_SECURITY_PROVIDER);
     configurationPropsToSkipCompare.add(HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT);
+    configurationPropsToSkipCompare.add(OMConfigKeys.OZONE_OM_NODES_KEY);
   }
 }

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java

@@ -1381,7 +1381,7 @@ public class TestOzoneManager {
    * set to true.
    */
   @Test
-  public void testRatsiServerOnOmInitialization() throws IOException {
+  public void testRatisServerOnOMInitialization() throws IOException {
     // OM Ratis server should not be started when OZONE_OM_RATIS_ENABLE_KEY
     // is not set to true
     Assert.assertNull("OM Ratis server started though OM Ratis is disabled.",

+ 290 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerConfiguration.java

@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.LifeCycle;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests OM related configurations.
+ */
+public class TestOzoneManagerConfiguration {
+
+  private OzoneConfiguration conf;
+  private MiniOzoneCluster cluster;
+  private String omId;
+  private String clusterId;
+  private String scmId;
+  private OzoneManager om;
+  private OzoneManagerRatisServer omRatisServer;
+
+  private static final long LEADER_ELECTION_TIMEOUT = 500L;
+
+  @Before
+  public void init() throws IOException {
+    conf = new OzoneConfiguration();
+    omId = UUID.randomUUID().toString();
+    clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
+    final String path = GenericTestUtils.getTempPath(omId);
+    Path metaDirPath = Paths.get(path, "om-meta");
+    conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString());
+    conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
+    conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
+    conf.setTimeDuration(
+        OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+        LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+
+    OMStorage omStore = new OMStorage(conf);
+    omStore.setClusterId("testClusterId");
+    omStore.setScmId("testScmId");
+    // writes the version file properties
+    omStore.initialize();
+  }
+
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private void startCluster() throws Exception {
+    cluster =  MiniOzoneCluster.newBuilder(conf)
+      .setClusterId(clusterId)
+      .setScmId(scmId)
+      .setOmId(omId)
+      .build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  /**
+   * Test a single node OM service (default setting for MiniOzoneCluster).
+   * @throws Exception
+   */
+  @Test
+  public void testSingleNodeOMservice() throws Exception {
+    // Default settings of MiniOzoneCluster start a sinle node OM service.
+    startCluster();
+    om = cluster.getOzoneManager();
+    omRatisServer = om.getOmRatisServer();
+
+    Assert.assertEquals(LifeCycle.State.RUNNING, om.getOmRatisServerState());
+    // OM's Ratis server should have only 1 peer (itself) in its RaftGroup
+    Collection<RaftPeer> peers = omRatisServer.getRaftGroup().getPeers();
+    Assert.assertEquals(1, peers.size());
+
+    // The RaftPeer id should match the configured omId
+    RaftPeer raftPeer = peers.toArray(new RaftPeer[1])[0];
+    Assert.assertEquals(omId, raftPeer.getId().toString());
+  }
+
+  /**
+   * Test configurating an OM service with three OM nodes.
+   * @throws Exception
+   */
+  @Test
+  public void testThreeNodeOMservice() throws Exception {
+    // Set the configuration for 3 node OM service. Set one node's rpc
+    // address to localhost. OM will parse all configurations and find the
+    // nodeId representing the localhost
+
+    final String omServiceId = "om-service-test1";
+    final String omNode1Id = "omNode1";
+    final String omNode2Id = "omNode2";
+    final String omNode3Id = "omNode3";
+
+    String omNodesKeyValue = omNode1Id + "," + omNode2Id + "," + omNode3Id;
+    String omNodesKey = OmUtils.addKeySuffixes(
+        OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId);
+
+    String omNode1RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode1Id);
+    String omNode2RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode2Id);
+    String omNode3RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode3Id);
+
+    String omNode3RatisPortKey = OmUtils.addKeySuffixes(
+        OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, omServiceId, omNode3Id);
+
+    conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServiceId);
+    conf.set(omNodesKey, omNodesKeyValue);
+
+    // Set node2 to localhost and the other two nodes to dummy addresses
+    conf.set(omNode1RpcAddrKey, "123.0.0.123:9862");
+    conf.set(omNode2RpcAddrKey, "0.0.0.0:9862");
+    conf.set(omNode3RpcAddrKey, "124.0.0.124:9862");
+
+    conf.setInt(omNode3RatisPortKey, 9898);
+
+    startCluster();
+    om = cluster.getOzoneManager();
+    omRatisServer = om.getOmRatisServer();
+
+    Assert.assertEquals(LifeCycle.State.RUNNING, om.getOmRatisServerState());
+
+    // OM's Ratis server should have 3 peers in its RaftGroup
+    Collection<RaftPeer> peers = omRatisServer.getRaftGroup().getPeers();
+    Assert.assertEquals(3, peers.size());
+
+    // Ratis server RaftPeerId should match with omNode2 ID as node2 is the
+    // localhost
+    Assert.assertEquals(omNode2Id, omRatisServer.getRaftPeerId().toString());
+
+    // Verify peer details
+    for (RaftPeer peer : peers) {
+      String expectedPeerAddress = null;
+      switch (peer.getId().toString()) {
+      case omNode1Id :
+        // Ratis port is not set for node1. So it should take the default port
+        expectedPeerAddress = "123.0.0.123:" +
+            OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT;
+        break;
+      case omNode2Id :
+        expectedPeerAddress = "0.0.0.0:"+
+            OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT;
+        break;
+      case omNode3Id :
+        // Ratis port is not set for node3. So it should take the default port
+        expectedPeerAddress = "124.0.0.124:9898";
+        break;
+      default : Assert.fail("Unrecognized RaftPeerId");
+      }
+      Assert.assertEquals(expectedPeerAddress, peer.getAddress());
+    }
+  }
+
+  /**
+   * Test a wrong configuration for OM HA. A configuration with none of the
+   * OM addresses matching the local address should throw an error.
+   * @throws Exception
+   */
+  @Test
+  public void testWrongConfiguration() throws Exception {
+    String omServiceId = "om-service-test1";
+
+    String omNode1Id = "omNode1";
+    String omNode2Id = "omNode2";
+    String omNode3Id = "omNode3";
+    String omNodesKeyValue = omNode1Id + "," + omNode2Id + "," + omNode3Id;
+    String omNodesKey = OmUtils.addKeySuffixes(
+        OMConfigKeys.OZONE_OM_NODES_KEY, omServiceId);
+
+    String omNode1RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode1Id);
+    String omNode2RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode2Id);
+    String omNode3RpcAddrKey = getOMAddrKeyWithSuffix(omServiceId, omNode3Id);
+
+    conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServiceId);
+    conf.set(omNodesKey, omNodesKeyValue);
+
+    // Set node2 to localhost and the other two nodes to dummy addresses
+    conf.set(omNode1RpcAddrKey, "123.0.0.123:9862");
+    conf.set(omNode2RpcAddrKey, "125.0.0.2:9862");
+    conf.set(omNode3RpcAddrKey, "124.0.0.124:9862");
+
+    try {
+      startCluster();
+      Assert.fail("Wrong Configuration. OM initialization should have failed.");
+    } catch (OzoneIllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains("Configuration has no " +
+          OMConfigKeys.OZONE_OM_ADDRESS_KEY + " address that matches local " +
+          "node's address.", e);
+    }
+  }
+
+  /**
+   * Test multiple OM service configuration.
+   */
+  @Test
+  public void testMultipleOMServiceIds() throws Exception {
+    // Set up OZONE_OM_SERVICES_KEY with 2 service Ids.
+    String om1ServiceId = "om-service-test1";
+    String om2ServiceId = "om-service-test2";
+    String omServices = om1ServiceId + "," + om2ServiceId;
+    conf.set(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY, omServices);
+
+    String omNode1Id = "omNode1";
+    String omNode2Id = "omNode2";
+    String omNode3Id = "omNode3";
+    String omNodesKeyValue = omNode1Id + "," + omNode2Id + "," + omNode3Id;
+
+    // Set the node Ids for the 2 services. The nodeIds need to be
+    // distinch within one service. The ids can overlap between
+    // different services.
+    String om1NodesKey = OmUtils.addKeySuffixes(
+        OMConfigKeys.OZONE_OM_NODES_KEY, om1ServiceId);
+    String om2NodesKey = OmUtils.addKeySuffixes(
+        OMConfigKeys.OZONE_OM_NODES_KEY, om2ServiceId);
+    conf.set(om1NodesKey, omNodesKeyValue);
+    conf.set(om2NodesKey, omNodesKeyValue);
+
+    // Set the RPC addresses for all 6 OMs (3 for each service). Only one
+    // node out of these must have the localhost address.
+    conf.set(getOMAddrKeyWithSuffix(om1ServiceId, omNode1Id),
+        "122.0.0.123:9862");
+    conf.set(getOMAddrKeyWithSuffix(om1ServiceId, omNode2Id),
+        "123.0.0.124:9862");
+    conf.set(getOMAddrKeyWithSuffix(om1ServiceId, omNode3Id),
+        "124.0.0.125:9862");
+    conf.set(getOMAddrKeyWithSuffix(om2ServiceId, omNode1Id),
+        "125.0.0.126:9862");
+    conf.set(getOMAddrKeyWithSuffix(om2ServiceId, omNode2Id),
+        "0.0.0.0:9862");
+    conf.set(getOMAddrKeyWithSuffix(om2ServiceId, omNode3Id),
+        "126.0.0.127:9862");
+
+    startCluster();
+    om = cluster.getOzoneManager();
+    omRatisServer = om.getOmRatisServer();
+
+    Assert.assertEquals(LifeCycle.State.RUNNING, om.getOmRatisServerState());
+
+    // OM's Ratis server should have 3 peers in its RaftGroup
+    Collection<RaftPeer> peers = omRatisServer.getRaftGroup().getPeers();
+    Assert.assertEquals(3, peers.size());
+
+    // Verify that the serviceId and nodeId match the node with the localhost
+    // address - om-service-test2 and omNode2
+    Assert.assertEquals(om2ServiceId, om.getOMServiceId());
+    Assert.assertEquals(omNode2Id, omRatisServer.getRaftPeerId().toString());
+  }
+
+  private String getOMAddrKeyWithSuffix(String serviceId, String nodeId) {
+    return OmUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
+        serviceId, nodeId);
+  }
+}

+ 156 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java

@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
+import org.apache.hadoop.ozone.*;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.response.VolumeInfo;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.junit.*;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
+    .NODE_FAILURE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
+
+/**
+ * Test Ozone Manager operation in distributed handler scenario.
+ */
+public class TestOzoneManagerHA {
+
+  private MiniOzoneHAClusterImpl cluster = null;
+  private StorageHandler storageHandler;
+  private UserArgs userArgs;
+  private OzoneConfiguration conf;
+  private String clusterId;
+  private String scmId;
+  private int numOfOMs = 3;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Rule
+  public Timeout timeout = new Timeout(60_000);
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  @Before
+  public void init() throws Exception {
+    conf = new OzoneConfiguration();
+    clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
+    conf.setBoolean(OZONE_ACL_ENABLED, true);
+    conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
+
+    cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
+        .setClusterId(clusterId)
+        .setScmId(scmId)
+        .setOMServiceId("om-service-test1")
+        .setNumOfOzoneManagers(numOfOMs)
+        .build();
+    cluster.waitForClusterToBeReady();
+    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+        null, null, null, null);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test a client request when all OM nodes are running. The request should
+   * succeed.
+   * @throws Exception
+   */
+  @Test
+  public void testAllOMNodesRunning() throws Exception {
+    testCreateVolume(true);
+  }
+
+  /**
+   * Test client request succeeds even if one OM is down.
+   */
+  @Test
+  public void testOneOMNodeDown() throws Exception {
+    cluster.stopOzoneManager(1);
+    Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
+
+    testCreateVolume(true);
+  }
+
+  /**
+   * Test client request fails when 2 OMs are down.
+   */
+  @Test
+  public void testTwoOMNodesDown() throws Exception {
+    cluster.stopOzoneManager(1);
+    cluster.stopOzoneManager(2);
+    Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
+
+    testCreateVolume(false);
+  }
+
+  /**
+   * Create a volume and test its attribute.
+   */
+  private void testCreateVolume(boolean checkSuccess) throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+
+    storageHandler.createVolume(createVolumeArgs);
+
+    VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs);
+
+    if (checkSuccess) {
+      Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName));
+      Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName));
+    } else {
+      // Verify that the request failed
+      Assert.assertTrue(retVolumeinfo.getVolumeName().isEmpty());
+    }
+  }
+}

+ 107 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMNodeDetails.java

@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.net.NetUtils;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * This class stores OM node details.
+ */
+public final class OMNodeDetails {
+  private String omServiceId;
+  private String omNodeId;
+  private InetSocketAddress rpcAddress;
+  private int rpcPort;
+  private int ratisPort;
+
+  /**
+   * Constructs OMNodeDetails object.
+   */
+  private OMNodeDetails(String serviceId, String nodeId,
+      InetSocketAddress rpcAddr, int rpcPort, int ratisPort) {
+    this.omServiceId = serviceId;
+    this.omNodeId = nodeId;
+    this.rpcAddress = rpcAddr;
+    this.rpcPort = rpcPort;
+    this.ratisPort = ratisPort;
+  }
+
+  /**
+   * Builder class for OMNodeDetails.
+   */
+  public static class Builder {
+    private String omServiceId;
+    private String omNodeId;
+    private InetSocketAddress rpcAddress;
+    private int rpcPort;
+    private int ratisPort;
+
+    public Builder setRpcAddress(InetSocketAddress rpcAddr) {
+      this.rpcAddress = rpcAddr;
+      this.rpcPort = rpcAddress.getPort();
+      return this;
+    }
+
+    public Builder setRatisPort(int port) {
+      this.ratisPort = port;
+      return this;
+    }
+
+    public Builder setOMServiceId(String serviceId) {
+      this.omServiceId = serviceId;
+      return this;
+    }
+
+    public Builder setOMNodeId(String nodeId) {
+      this.omNodeId = nodeId;
+      return this;
+    }
+
+    public OMNodeDetails build() {
+      return new OMNodeDetails(omServiceId, omNodeId, rpcAddress, rpcPort,
+          ratisPort);
+    }
+  }
+
+  public String getOMServiceId() {
+    return omServiceId;
+  }
+
+  public String getOMNodeId() {
+    return omNodeId;
+  }
+
+  public InetSocketAddress getRpcAddress() {
+    return rpcAddress;
+  }
+
+  public InetAddress getAddress() {
+    return rpcAddress.getAddress();
+  }
+
+  public int getRatisPort() {
+    return ratisPort;
+  }
+
+  public String getRpcAddressString() {
+    return NetUtils.getHostPortString(rpcAddress);
+  }
+}

+ 252 - 33
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -25,8 +25,10 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.BlockingService;
 
 import java.security.KeyPair;
+import java.util.Collection;
 import java.util.Objects;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -50,6 +52,7 @@ import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.security.OzoneSecurityException;
@@ -144,7 +147,6 @@ import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
 import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
-import static org.apache.hadoop.ozone.OmUtils.getOmAddress;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT;
@@ -161,6 +163,12 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys
     .OZONE_OM_METRICS_SAVE_INTERVAL;
 import static org.apache.hadoop.ozone.om.OMConfigKeys
     .OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODE_ID_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+    .OZONE_OM_RATIS_PORT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_KEY;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
 import static org.apache.hadoop.ozone.protocol.proto
@@ -200,6 +208,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private RPC.Server omRpcServer;
   private InetSocketAddress omRpcAddress;
   private String omId;
+  private OMNodeDetails omNodeDetails;
+  private List<OMNodeDetails> peerNodes;
+  private boolean isRatisEnabled;
   private OzoneManagerRatisServer omRatisServer;
   private OzoneManagerRatisClient omRatisClient;
   private final OMMetadataManager metadataManager;
@@ -229,7 +240,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private final S3SecretManager s3SecretManager;
   private volatile boolean isOmRpcServerRunning = false;
 
-  private OzoneManager(OzoneConfiguration conf) throws IOException {
+  private OzoneManager(OzoneConfiguration conf) throws IOException,
+      AuthenticationException {
     super(OzoneVersionInfo.OZONE_VERSION_INFO);
     Preconditions.checkNotNull(conf);
     configuration = conf;
@@ -240,6 +252,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
           ResultCodes.OM_NOT_INITIALIZED);
     }
 
+    // Load HA related configurations
+    loadOMHAConfigs(configuration);
+
+    // Authenticate KSM if security is enabled
+    if (securityEnabled) {
+      loginOMUser(configuration);
+    }
+
     if (!testSecureOmFlag || !isOzoneSecurityEnabled()) {
       scmContainerClient = getScmContainerClient(configuration);
       // verifies that the SCM info in the OM Version file is correct.
@@ -256,12 +276,15 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       scmBlockClient = null;
     }
 
-
     RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
         ProtobufRpcEngine.class);
 
+    startRatisServer();
+    startRatisClient();
+
+    InetSocketAddress omNodeRpcAddr = omNodeDetails.getRpcAddress();
+    omRpcAddressTxt = new Text(omNodeDetails.getRpcAddressString());
 
-    omRpcAddressTxt = new Text(OmUtils.getOmRpcAddress(configuration));
     secConfig = new SecurityConfig(configuration);
     if (secConfig.isBlockTokenEnabled()) {
       blockTokenMgr = createBlockTokenSecretManager(configuration);
@@ -269,7 +292,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     if(secConfig.isSecurityEnabled()){
       delegationTokenMgr = createDelegationTokenSecretManager(configuration);
     }
-    InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
+
     omRpcServer = getRpcServer(conf);
     omRpcAddress = updateRPCListenAddress(configuration,
         OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
@@ -297,7 +320,157 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       accessAuthorizer = null;
     }
     omMetaDir = OmUtils.getOmDbDir(configuration);
+  }
+
+  /**
+   * Inspects and loads OM node configurations.
+   *
+   * If {@link OMConfigKeys#OZONE_OM_SERVICE_IDS_KEY} is configured with
+   * multiple ids and/ or if {@link OMConfigKeys#OZONE_OM_NODE_ID_KEY} is not
+   * specifically configured , this method determines the omServiceId
+   * and omNodeId by matching the node's address with the configured
+   * addresses. When a match is found, it sets the omServicId and omNodeId from
+   * the corresponding configuration key. This method also finds the OM peers
+   * nodes belonging to the same OM service.
+   *
+   * @param conf
+   */
+  private void loadOMHAConfigs(Configuration conf) {
+    InetSocketAddress localRpcAddress = null;
+    String localOMServiceId = null;
+    String localOMNodeId = null;
+    int localRatisPort = 0;
+    Collection<String> omServiceIds = conf.getTrimmedStringCollection(
+        OZONE_OM_SERVICE_IDS_KEY);
+
+    String knownOMNodeId = conf.get(OZONE_OM_NODE_ID_KEY);
+    int found = 0;
+    boolean isOMAddressSet = false;
+
+    for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
+      Collection<String> omNodeIds = OmUtils.getOMNodeIds(conf, serviceId);
+
+      List<OMNodeDetails> peerNodesList = new ArrayList<>();
+      boolean isPeer = false;
+      for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+        if (knownOMNodeId != null && !knownOMNodeId.equals(nodeId)) {
+          isPeer = true;
+        } else {
+          isPeer = false;
+        }
+        String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+            serviceId, nodeId);
+        String rpcAddrStr = conf.get(rpcAddrKey);
+        if (rpcAddrStr == null) {
+          continue;
+        }
+
+        // If OM address is set for any node id, we will not fallback to the
+        // default
+        isOMAddressSet = true;
+
+        String ratisPortKey = OmUtils.addKeySuffixes(OZONE_OM_RATIS_PORT_KEY,
+            serviceId, nodeId);
+        int ratisPort = conf.getInt(ratisPortKey, OZONE_OM_RATIS_PORT_DEFAULT);
 
+        InetSocketAddress addr = null;
+        try {
+          addr = NetUtils.createSocketAddr(rpcAddrStr);
+        } catch (Exception e) {
+          LOG.warn("Exception in creating socket address " + addr, e);
+          continue;
+        }
+        if (!addr.isUnresolved()) {
+          if (!isPeer && OmUtils.isAddressLocal(addr)) {
+            localRpcAddress = addr;
+            localOMServiceId = serviceId;
+            localOMNodeId = nodeId;
+            localRatisPort = ratisPort;
+            found++;
+          } else {
+            // This OMNode belongs to same OM service as the current OMNode.
+            // Add it to peerNodes list.
+            OMNodeDetails peerNodeInfo = new OMNodeDetails.Builder()
+                .setOMServiceId(serviceId)
+                .setOMNodeId(nodeId)
+                .setRpcAddress(addr)
+                .setRatisPort(ratisPort)
+                .build();
+            peerNodesList.add(peerNodeInfo);
+          }
+        }
+      }
+      if (found == 1) {
+        LOG.debug("Found one matching OM address with service ID: {} and node" +
+                " ID: {}", localOMServiceId, localOMNodeId);
+
+        setOMNodeDetails(localOMServiceId, localOMNodeId, localRpcAddress,
+            localRatisPort);
+        this.peerNodes = peerNodesList;
+
+        LOG.info("Found matching OM address with OMServiceId: {}, " +
+            "OMNodeId: {}, RPC Address: {} and Ratis port: {}",
+            localOMServiceId, localOMNodeId,
+            NetUtils.getHostPortString(localRpcAddress), localRatisPort);
+        return;
+      } else if (found > 1) {
+        String msg = "Configuration has multiple " + OZONE_OM_ADDRESS_KEY +
+            " addresses that match local node's address. Please configure the" +
+            " system with " + OZONE_OM_SERVICE_IDS_KEY + " and " +
+            OZONE_OM_ADDRESS_KEY;
+        throw new OzoneIllegalArgumentException(msg);
+      }
+    }
+
+    if (!isOMAddressSet) {
+      // No OM address is set. Fallback to default
+      InetSocketAddress omAddress = OmUtils.getOmAddress(conf);
+      int ratisPort = conf.getInt(OZONE_OM_RATIS_PORT_KEY,
+          OZONE_OM_RATIS_PORT_DEFAULT);
+
+      LOG.info("Configuration either no {} set. Falling back to the default " +
+          "OM address {}", OZONE_OM_ADDRESS_KEY, omAddress);
+
+      setOMNodeDetails(null, null, omAddress, ratisPort);
+
+    } else {
+      String msg = "Configuration has no " + OZONE_OM_ADDRESS_KEY + " " +
+          "address that matches local node's address. Please configure the " +
+          "system with " + OZONE_OM_ADDRESS_KEY;
+      LOG.info(msg);
+      throw new OzoneIllegalArgumentException(msg);
+    }
+  }
+
+  /**
+   * Builds and sets OMNodeDetails object.
+   */
+  private void setOMNodeDetails(String serviceId, String nodeId,
+      InetSocketAddress rpcAddress, int ratisPort) {
+
+    if (serviceId == null) {
+      // If no serviceId is set, take the default serviceID om-service
+      serviceId = OzoneConsts.OM_SERVICE_ID_DEFAULT;
+      LOG.info("OM Service ID is not set. Setting it to the default ID: {}",
+          serviceId);
+    }
+    if (nodeId == null) {
+      // If no nodeId is set, take the omId from omStorage as the nodeID
+      nodeId = omId;
+      LOG.info("OM Node ID is not set. Setting it to the OmStorage's " +
+          "OmID: {}", nodeId);
+    }
+
+    this.omNodeDetails = new OMNodeDetails.Builder()
+        .setOMServiceId(serviceId)
+        .setOMNodeId(nodeId)
+        .setRpcAddress(rpcAddress)
+        .setRatisPort(ratisPort)
+        .build();
+
+    // Set this nodes OZONE_OM_ADDRESS_KEY to the discovered address.
+    configuration.set(OZONE_OM_ADDRESS_KEY,
+        NetUtils.getHostPortString(rpcAddress));
   }
 
   /**
@@ -479,7 +652,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    * @param  conf
    * @throws IOException, AuthenticationException
    */
-  private static void loginOMUser(OzoneConfiguration conf)
+  private void loginOMUser(OzoneConfiguration conf)
       throws IOException, AuthenticationException {
 
     if (SecurityUtil.getAuthenticationMethod(conf).equals(
@@ -491,7 +664,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
       UserGroupInformation.setConfiguration(conf);
 
-      InetSocketAddress socAddr = getOmAddress(conf);
+      InetSocketAddress socAddr = OmUtils.getOmAddress(conf);
       SecurityUtil.login(conf, OZONE_OM_KERBEROS_KEYTAB_FILE_KEY,
           OZONE_OM_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
     } else {
@@ -660,10 +833,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
 
     securityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
-    // Authenticate KSM if security is enabled
-    if (securityEnabled) {
-      loginOMUser(conf);
-    }
+
     switch (startOpt) {
     case INIT:
       if (printBanner) {
@@ -792,6 +962,16 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     return omStorage;
   }
 
+  @VisibleForTesting
+  public OzoneManagerRatisServer getOmRatisServer() {
+    return omRatisServer;
+  }
+
+  @VisibleForTesting
+  public InetSocketAddress getOmRpcServerAddr() {
+    return omRpcAddress;
+  }
+
   @VisibleForTesting
   public LifeCycle.State getOmRatisServerState() {
     if (omRatisServer == null) {
@@ -866,7 +1046,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     LOG.info(buildRpcServerStartMessage("OzoneManager RPC server",
         omRpcAddress));
 
-
     DefaultMetricsSystem.initialize("OzoneManager");
 
     metadataManager.start(configuration);
@@ -894,6 +1073,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     omRpcServer = getRpcServer(configuration);
     omRpcServer.start();
     isOmRpcServerRunning = true;
+
+    startRatisServer();
+    startRatisClient();
+
     try {
       httpServer = new OzoneManagerHttpServer(configuration, this);
       httpServer.start();
@@ -919,40 +1102,65 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       return omRpcServer;
     }
 
-    InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
+    InetSocketAddress omNodeRpcAddr = OmUtils.getOmAddress(configuration);
+
+    final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
+        OZONE_OM_HANDLER_COUNT_DEFAULT);
+    RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
+        ProtobufRpcEngine.class);
+
+    BlockingService omService = newReflectiveBlockingService(
+        new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisClient,
+            isRatisEnabled));
+    return startRpcServer(configuration, omNodeRpcAddr,
+        OzoneManagerProtocolPB.class, omService,
+        handlerCount);
+  }
+
+  /**
+   * Creates an instance of ratis server.
+   */
+  private void startRatisServer() throws IOException {
     // This is a temporary check. Once fully implemented, all OM state change
-    // should go through Ratis - either standalone (for non-HA) or replicated
+    // should go through Ratis - be it standalone (for non-HA) or replicated
     // (for HA).
-    boolean omRatisEnabled = configuration.getBoolean(
+    isRatisEnabled = configuration.getBoolean(
         OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
         OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
-    if (omRatisEnabled) {
-      omRatisServer = OzoneManagerRatisServer.newOMRatisServer(this, omId,
-          omNodeRpcAddr.getAddress(), configuration);
+    if (isRatisEnabled) {
+      if (omRatisServer == null) {
+        omRatisServer = OzoneManagerRatisServer.newOMRatisServer(
+            configuration, this, omNodeDetails, peerNodes);
+      }
       omRatisServer.start();
 
       LOG.info("OzoneManager Ratis server started at port {}",
           omRatisServer.getServerPort());
+    } else {
+      omRatisServer = null;
+    }
+  }
 
-      omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
-          omId, omRatisServer.getRaftGroup(), configuration);
+  /**
+   * Creates an instance of ratis client.
+   */
+  private void startRatisClient() throws IOException {
+    // This is a temporary check. Once fully implemented, all OM state change
+    // should go through Ratis - be it standalone (for non-HA) or replicated
+    // (for HA).
+    isRatisEnabled = configuration.getBoolean(
+      OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
+      OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
+    if (isRatisEnabled) {
+      if (omRatisClient == null) {
+        omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
+            omNodeDetails.getOMNodeId(), omRatisServer.getRaftGroup(),
+            configuration);
+      }
       omRatisClient.connect();
     } else {
-      omRatisServer = null;
       omRatisClient = null;
     }
-
-    final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
-        OZONE_OM_HANDLER_COUNT_DEFAULT);
-    RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
-        ProtobufRpcEngine.class);
-
-    BlockingService omService = newReflectiveBlockingService(
-        new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisClient,
-            omRatisEnabled));
-    return startRpcServer(configuration, omNodeRpcAddr,
-        OzoneManagerProtocolPB.class, omService,
-        handlerCount);
   }
 
   /**
@@ -970,6 +1178,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       if (omRatisServer != null) {
         omRatisServer.stop();
       }
+      if (omRatisClient != null) {
+        omRatisClient.close();
+      }
       isOmRpcServerRunning = false;
       keyManager.stop();
       stopSecretManager();
@@ -2188,4 +2399,12 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public static void setTestSecureOmFlag(boolean testSecureOmFlag) {
     OzoneManager.testSecureOmFlag = testSecureOmFlag;
   }
+
+  public String getOMNodId() {
+    return omNodeDetails.getOMNodeId();
+  }
+
+  public String getOMServiceId() {
+    return omNodeDetails.getOMServiceId();
+  }
 }

+ 11 - 24
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java

@@ -19,12 +19,10 @@ package org.apache.hadoop.ozone.om.ratis;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OmUtils;
@@ -57,7 +55,7 @@ public final class OzoneManagerRatisClient implements Closeable {
   private final RaftGroup raftGroup;
   private final String omID;
   private final RpcType rpcType;
-  private final AtomicReference<RaftClient> client = new AtomicReference<>();
+  private RaftClient raftClient;
   private final RetryPolicy retryPolicy;
   private final Configuration conf;
 
@@ -101,32 +99,21 @@ public final class OzoneManagerRatisClient implements Closeable {
     // maxOutstandingRequests so as to set the upper bound on max no of async
     // requests to be handled by raft client
 
-    if (!client.compareAndSet(null, OMRatisHelper.newRaftClient(
-        rpcType, omID, raftGroup, retryPolicy, conf))) {
-      throw new IllegalStateException("Client is already connected.");
-    }
+    raftClient = OMRatisHelper.newRaftClient(rpcType, omID, raftGroup,
+        retryPolicy, conf);
   }
 
   @Override
   public void close() {
-    final RaftClient c = client.getAndSet(null);
-    if (c != null) {
-      closeRaftClient(c);
+    if (raftClient != null) {
+      try {
+        raftClient.close();
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
+      }
     }
   }
 
-  private void closeRaftClient(RaftClient raftClient) {
-    try {
-      raftClient.close();
-    } catch (IOException e) {
-      throw new IllegalStateException(e);
-    }
-  }
-
-  private RaftClient getClient() {
-    return Objects.requireNonNull(client.get(), "client is null");
-  }
-
   /**
    * Sends a given request to server and gets the reply back.
    * @param request Request
@@ -188,7 +175,7 @@ public final class OzoneManagerRatisClient implements Closeable {
     boolean isReadOnlyRequest = OmUtils.isReadOnly(request);
     ByteString byteString = OMRatisHelper.convertRequestToByteString(request);
     LOG.debug("sendOMRequestAsync {} {}", isReadOnlyRequest, request);
-    return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) :
-        getClient().sendAsync(() -> byteString);
+    return isReadOnlyRequest ? raftClient.sendReadOnlyAsync(() -> byteString) :
+        raftClient.sendAsync(() -> byteString);
   }
 }

+ 88 - 51
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java

@@ -22,19 +22,17 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketAddress;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Objects;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMNodeDetails;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.RaftClientConfigKeys;
@@ -50,6 +48,7 @@ import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
@@ -71,28 +70,37 @@ public final class OzoneManagerRatisServer {
   private final RaftPeerId raftPeerId;
   private final OzoneManagerProtocol ozoneManager;
 
-  private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
-
-  private static long nextCallId() {
-    return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
-  }
-
-  private OzoneManagerRatisServer(OzoneManagerProtocol om, String omId,
-      InetAddress addr, int port, Configuration conf) throws IOException {
-    Objects.requireNonNull(omId, "omId is null");
+  /**
+   * Returns an OM Ratis server.
+   * @param conf configuration
+   * @param om the OM instance starting the ratis server
+   * @param raftGroupIdStr raft group id string
+   * @param localRaftPeerId raft peer id of this Ratis server
+   * @param addr address of the ratis server
+   * @param raftPeers peer nodes in the raft ring
+   * @throws IOException
+   */
+  private OzoneManagerRatisServer(Configuration conf, OzoneManagerProtocol om,
+      String raftGroupIdStr, RaftPeerId localRaftPeerId,
+      InetSocketAddress addr, List<RaftPeer> raftPeers)
+      throws IOException {
     this.ozoneManager = om;
-    this.port = port;
-    this.omRatisAddress = new InetSocketAddress(addr.getHostAddress(), port);
+    this.omRatisAddress = addr;
+    this.port = addr.getPort();
     RaftProperties serverProperties = newRaftProperties(conf);
 
-    // TODO: When implementing replicated OM ratis servers, RaftGroupID
-    // should be the same across all the OMs. Add all the OM servers as Raft
-    // Peers.
-    this.raftGroupId = RaftGroupId.randomId();
-    this.raftPeerId = RaftPeerId.getRaftPeerId(omId);
+    this.raftPeerId = localRaftPeerId;
+    this.raftGroupId = RaftGroupId.valueOf(
+        ByteString.copyFromUtf8(raftGroupIdStr));
+    this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
+
+    StringBuilder raftPeersStr = new StringBuilder();
+    for (RaftPeer peer : raftPeers) {
+      raftPeersStr.append(", ").append(peer.getAddress());
+    }
+    LOG.info("Instantiating OM Ratis server with GroupID: {} and " +
+        "Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2));
 
-    RaftPeer raftPeer = new RaftPeer(raftPeerId, omRatisAddress);
-    this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeer);
     this.server = RaftServer.newBuilder()
         .setServerId(this.raftPeerId)
         .setGroup(this.raftGroup)
@@ -101,31 +109,42 @@ public final class OzoneManagerRatisServer {
         .build();
   }
 
+  /**
+   * Creates an instance of OzoneManagerRatisServer.
+   */
   public static OzoneManagerRatisServer newOMRatisServer(
-      OzoneManagerProtocol om, String omId, InetAddress omAddress,
-      Configuration ozoneConf) throws IOException {
-    int localPort = ozoneConf.getInt(
-        OMConfigKeys.OZONE_OM_RATIS_PORT_KEY,
-        OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT);
-
-    // Get an available port on current node and
-    // use that as the container port
-    if (ozoneConf.getBoolean(
-        OMConfigKeys.OZONE_OM_RATIS_RANDOM_PORT_KEY,
-        OMConfigKeys.OZONE_OM_RATIS_RANDOM_PORT_KEY_DEFAULT)) {
-      try (ServerSocket socket = new ServerSocket()) {
-        socket.setReuseAddress(true);
-        SocketAddress address = new InetSocketAddress(0);
-        socket.bind(address);
-        localPort = socket.getLocalPort();
-        LOG.info("Found a free port for the OM Ratis server : {}", localPort);
-      } catch (IOException e) {
-        LOG.error("Unable find a random free port for the server, "
-            + "fallback to use default port {}", localPort, e);
-      }
+      Configuration ozoneConf, OzoneManagerProtocol om,
+      OMNodeDetails omNodeDetails, List<OMNodeDetails> peerNodes)
+      throws IOException {
+
+    // RaftGroupId is the omServiceId
+    String omServiceId = omNodeDetails.getOMServiceId();
+
+    String omNodeId = omNodeDetails.getOMNodeId();
+    RaftPeerId localRaftPeerId = RaftPeerId.getRaftPeerId(omNodeId);
+
+    InetSocketAddress ratisAddr = new InetSocketAddress(
+        omNodeDetails.getAddress(), omNodeDetails.getRatisPort());
+
+    RaftPeer localRaftPeer = new RaftPeer(localRaftPeerId, ratisAddr);
+
+    List<RaftPeer> raftPeers = new ArrayList<>();
+    // Add this Ratis server to the Ratis ring
+    raftPeers.add(localRaftPeer);
+
+    for (OMNodeDetails peerInfo : peerNodes) {
+      String peerNodeId = peerInfo.getOMNodeId();
+      InetSocketAddress peerRatisAddr = new InetSocketAddress(
+          peerInfo.getAddress(), peerInfo.getRatisPort());
+      RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId);
+      RaftPeer raftPeer = new RaftPeer(raftPeerId, peerRatisAddr);
+
+      // Add other OM nodes belonging to the same OM service to the Ratis ring
+      raftPeers.add(raftPeer);
     }
-    return new OzoneManagerRatisServer(om, omId, omAddress, localPort,
-        ozoneConf);
+
+    return new OzoneManagerRatisServer(ozoneConf, om, omServiceId,
+        localRaftPeerId, ratisAddr, raftPeers);
   }
 
   public RaftGroup getRaftGroup() {
@@ -139,6 +158,10 @@ public final class OzoneManagerRatisServer {
     return  new OzoneManagerStateMachine(ozoneManager);
   }
 
+  /**
+   * Start the Ratis server.
+   * @throws IOException
+   */
   public void start() throws IOException {
     LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
         server.getId(), port);
@@ -266,11 +289,6 @@ public final class OzoneManagerRatisServer {
 
     // TODO: set max write buffer size
 
-    /**
-     * TODO: set following ratis leader election related configs when
-     * replicated ratis server is implemented.
-     * 1. node failure timeout
-     */
     // Set the ratis leader election timeout
     TimeUnit leaderElectionMinTimeoutUnit =
         OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
@@ -288,6 +306,20 @@ public final class OzoneManagerRatisServer {
     RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
         TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
 
+    TimeUnit nodeFailureTimeoutUnit =
+        OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
+            .getUnit();
+    long nodeFailureTimeoutDuration = conf.getTimeDuration(
+        OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
+        OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
+            .getDuration(), nodeFailureTimeoutUnit);
+    final TimeDuration nodeFailureTimeout = TimeDuration.valueOf(
+        nodeFailureTimeoutDuration, nodeFailureTimeoutUnit);
+    RaftServerConfigKeys.setLeaderElectionTimeout(properties,
+        nodeFailureTimeout);
+    RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
+        nodeFailureTimeout);
+
     /**
      * TODO: when ratis snapshots are implemented, set snapshot threshold and
      * queue size.
@@ -305,6 +337,11 @@ public final class OzoneManagerRatisServer {
     return server.getLifeCycleState();
   }
 
+  @VisibleForTesting
+  public RaftPeerId getRaftPeerId() {
+    return this.raftPeerId;
+  }
+
   /**
    * Get the local directory where ratis logs will be stored.
    */

+ 19 - 4
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java

@@ -19,14 +19,19 @@
 package org.apache.hadoop.ozone.om.ratis;
 
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMNodeDetails;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -64,8 +69,20 @@ public class TestOzoneManagerRatisServer {
     conf.setTimeDuration(
         OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
         LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
-    omRatisServer = OzoneManagerRatisServer.newOMRatisServer(null, omID,
-        InetAddress.getLocalHost(), conf);
+    int ratisPort = conf.getInt(
+        OMConfigKeys.OZONE_OM_RATIS_PORT_KEY,
+        OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT);
+    InetSocketAddress rpcAddress = new InetSocketAddress(
+        InetAddress.getLocalHost(), 0);
+    OMNodeDetails omNodeDetails = new OMNodeDetails.Builder()
+        .setRpcAddress(rpcAddress)
+        .setRatisPort(ratisPort)
+        .setOMNodeId(omID)
+        .setOMServiceId(OzoneConsts.OM_SERVICE_ID_DEFAULT)
+        .build();
+    // Starts a single node Ratis server
+    omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, null,
+      omNodeDetails, Collections.emptyList());
     omRatisServer.start();
     omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(omID,
         omRatisServer.getRaftGroup(), conf);
@@ -94,8 +111,6 @@ public class TestOzoneManagerRatisServer {
   /**
    * Submit any request to OM Ratis server and check that the dummy response
    * message is received.
-   * TODO: Once state machine is implemented, submitting a request to Ratis
-   * server should result in a valid response.
    */
   @Test
   public void testSubmitRatisRequest() throws Exception {