瀏覽代碼

HDDS-1371. OMSnapshotProvider to download DB checkpoint from leader OM. (#703)

Hanisha Koneru 6 年之前
父節點
當前提交
0b115b60b0
共有 17 個文件被更改,包括 723 次插入55 次删除
  1. 11 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
  2. 16 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBCheckpoint.java
  3. 11 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RocksDBCheckpoint.java
  4. 41 8
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  5. 3 1
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
  6. 105 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
  7. 21 0
      hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
  8. 10 6
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
  9. 125 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java
  10. 25 1
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
  11. 39 2
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMNodeDetails.java
  12. 71 17
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
  13. 7 2
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java
  14. 2 15
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
  15. 210 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java
  16. 23 0
      hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/package-info.java
  17. 3 3
      hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java

+ 11 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java

@@ -76,6 +76,12 @@ public final class OzoneConsts {
   public static final String OZONE_USER = "user";
   public static final String OZONE_REQUEST = "request";
 
+  // OM Http server endpoints
+  public static final String OZONE_OM_SERVICE_LIST_HTTP_ENDPOINT =
+      "/serviceList";
+  public static final String OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT =
+      "/dbCheckpoint";
+
   // Ozone File System scheme
   public static final String OZONE_URI_SCHEME = "o3fs";
 
@@ -286,4 +292,9 @@ public final class OzoneConsts {
 
   // OM Ratis snapshot file to store the last applied index
   public static final String OM_RATIS_SNAPSHOT_INDEX = "ratisSnapshotIndex";
+
+  // OM Http request parameter to be used while downloading DB checkpoint
+  // from OM leader to follower
+  public static final String OM_RATIS_SNAPSHOT_BEFORE_DB_CHECKPOINT =
+      "snapshotBeforeCheckpoint";
 }

+ 16 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBCheckpoint.java

@@ -55,4 +55,20 @@ public interface DBCheckpoint {
    */
   void cleanupCheckpoint() throws IOException;
 
+  /**
+   * Set the OM Ratis snapshot index corresponding to the OM DB checkpoint.
+   * The snapshot index is the latest snapshot index saved by ratis
+   * snapshots. It is not guaranteed to be the last ratis index applied to
+   * the OM DB state.
+   * @param omRatisSnapshotIndex the saved ratis snapshot index
+   */
+  void setRatisSnapshotIndex(long omRatisSnapshotIndex);
+
+  /**
+   * Get the OM Ratis snapshot index corresponding to the OM DB checkpoint.
+   * The ratis snapshot index indicates upto which index is definitely
+   * included in the DB checkpoint. It is not guaranteed to be the last ratis
+   * log index applied to the DB checkpoint.
+   */
+  long getRatisSnapshotIndex();
 }

+ 11 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RocksDBCheckpoint.java

@@ -38,6 +38,7 @@ public class RocksDBCheckpoint implements DBCheckpoint {
   private long checkpointTimestamp = System.currentTimeMillis();
   private long latestSequenceNumber = -1;
   private long checkpointCreationTimeTaken = 0L;
+  private long ratisSnapshotIndex = 0L;
 
   public RocksDBCheckpoint(Path checkpointLocation) {
     this.checkpointLocation = checkpointLocation;
@@ -78,4 +79,14 @@ public class RocksDBCheckpoint implements DBCheckpoint {
     LOG.debug("Cleaning up checkpoint at " + checkpointLocation.toString());
     FileUtils.deleteDirectory(checkpointLocation.toFile());
   }
+
+  @Override
+  public void setRatisSnapshotIndex(long omRatisSnapshotIndex) {
+    this.ratisSnapshotIndex = omRatisSnapshotIndex;
+  }
+
+  @Override
+  public long getRatisSnapshotIndex() {
+    return ratisSnapshotIndex;
+  }
 }

+ 41 - 8
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -1585,6 +1585,8 @@
       logs. If this is not set then default metadata dirs is used. A warning
       will be logged if this not set. Ideally, this should be mapped to a
       fast disk like an SSD.
+      If undefined, OM ratis storage dir will fallback to ozone.metadata.dirs.
+      This fallback approach is not recommended for production environments.
     </description>
   </property>
 
@@ -1703,6 +1705,45 @@
       .</description>
   </property>
 
+  <property>
+    <name>ozone.om.ratis.snapshot.dir</name>
+    <value/>
+    <tag>OZONE, OM, STORAGE, MANAGEMENT, RATIS</tag>
+    <description>This directory is used for storing OM's snapshot
+      related files like the ratisSnapshotIndex and DB checkpoint from leader
+      OM.
+      If undefined, OM snapshot dir will fallback to ozone.om.ratis.storage.dir.
+      This fallback approach is not recommended for production environments.
+    </description>
+  </property>
+  <property>
+    <name>ozone.om.snapshot.provider.socket.timeout</name>
+    <value>5000s</value>
+    <tag>OZONE, OM, HA, MANAGEMENT</tag>
+    <description>
+      Socket timeout for HTTP call made by OM Snapshot Provider to request
+      OM snapshot from OM Leader.
+    </description>
+  </property>
+  <property>
+    <name>ozone.om.snapshot.provider.connection.timeout</name>
+    <value>5000s</value>
+    <tag>OZONE, OM, HA, MANAGEMENT</tag>
+    <description>
+      Connection timeout for HTTP call made by OM Snapshot Provider to request
+      OM snapshot from OM Leader.
+    </description>
+  </property>
+  <property>
+    <name>ozone.om.snapshot.provider.request.timeout</name>
+    <value>5000ms</value>
+    <tag>OZONE, OM, HA, MANAGEMENT</tag>
+    <description>
+      Connection request timeout for HTTP call made by OM Snapshot Provider to
+      request OM snapshot from OM Leader.
+    </description>
+  </property>
+
   <property>
     <name>ozone.acl.authorizer.class</name>
     <value>org.apache.hadoop.ozone.security.acl.OzoneAccessAuthorizer</value>
@@ -2346,14 +2387,6 @@
       OM snapshot.
     </description>
   </property>
-  <property>
-    <name>recon.om.socket.timeout</name>
-    <value>5s</value>
-    <tag>OZONE, RECON, OM</tag>
-    <description>
-      Socket timeout for HTTP call made by Recon to request OM snapshot.
-    </description>
-  </property>
   <property>
     <name>recon.om.snapshot.task.initial.delay</name>
     <value>1m</value>

+ 3 - 1
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java

@@ -99,6 +99,7 @@ import java.util.stream.Collectors;
 
 import static java.net.HttpURLConnection.HTTP_CREATED;
 import static java.net.HttpURLConnection.HTTP_OK;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_SERVICE_LIST_HTTP_ENDPOINT;
 
 /**
  * Ozone Client REST protocol implementation. It uses REST protocol to
@@ -190,7 +191,8 @@ public class RestClient implements ClientProtocol {
               " details on configuring Ozone.");
     }
 
-    HttpGet httpGet = new HttpGet("http://" + httpAddress + "/serviceList");
+    HttpGet httpGet = new HttpGet("http://" + httpAddress +
+        OZONE_OM_SERVICE_LIST_HTTP_ENDPOINT);
     HttpEntity entity = executeHttpRequest(httpGet);
     try {
       String serviceListJson = EntityUtils.toString(entity);

+ 105 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java

@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Collection;
@@ -34,12 +35,15 @@ import java.util.Collections;
 import java.util.Optional;
 import java.util.zip.GZIPOutputStream;
 
+import com.google.common.base.Strings;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.commons.compress.utils.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.ScmUtils;
+import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -48,7 +52,11 @@ import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
 import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_BIND_HOST_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTPS_BIND_HOST_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTPS_BIND_PORT_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_BIND_HOST_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_BIND_PORT_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_PORT_DEFAULT;
@@ -366,4 +374,101 @@ public final class OmUtils {
     }
   }
 
+  /**
+   * If a OM conf is only set with key suffixed with OM Node ID, return the
+   * set value.
+   * @return null if base conf key is set, otherwise the value set for
+   * key suffixed with Node ID.
+   */
+  public static String getConfSuffixedWithOMNodeId(Configuration conf,
+      String confKey, String omServiceID, String omNodeId) {
+    String confValue = conf.getTrimmed(confKey);
+    if (StringUtils.isNotEmpty(confValue)) {
+      return null;
+    }
+    String suffixedConfKey = OmUtils.addKeySuffixes(
+        confKey, omServiceID, omNodeId);
+    confValue = conf.getTrimmed(suffixedConfKey);
+    if (StringUtils.isNotEmpty(confValue)) {
+      return confValue;
+    }
+    return null;
+  }
+
+  /**
+   * Returns the http address of peer OM node.
+   * @param conf Configuration
+   * @param omNodeId peer OM node ID
+   * @param omNodeHostAddr peer OM node host address
+   * @return http address of peer OM node in the format <hostName>:<port>
+   */
+  public static String getHttpAddressForOMPeerNode(Configuration conf,
+      String omServiceId, String omNodeId, String omNodeHostAddr) {
+    final Optional<String> bindHost = getHostNameFromConfigKeys(conf,
+        addKeySuffixes(OZONE_OM_HTTP_BIND_HOST_KEY, omServiceId, omNodeId));
+
+    final Optional<Integer> addressPort = getPortNumberFromConfigKeys(conf,
+        addKeySuffixes(OZONE_OM_HTTP_ADDRESS_KEY, omServiceId, omNodeId));
+
+    final Optional<String> addressHost = getHostNameFromConfigKeys(conf,
+        addKeySuffixes(OZONE_OM_HTTP_ADDRESS_KEY, omServiceId, omNodeId));
+
+    String hostName = bindHost.orElse(addressHost.orElse(omNodeHostAddr));
+
+    return hostName + ":" + addressPort.orElse(OZONE_OM_HTTP_BIND_PORT_DEFAULT);
+  }
+
+  /**
+   * Returns the https address of peer OM node.
+   * @param conf Configuration
+   * @param omNodeId peer OM node ID
+   * @param omNodeHostAddr peer OM node host address
+   * @return https address of peer OM node in the format <hostName>:<port>
+   */
+  public static String getHttpsAddressForOMPeerNode(Configuration conf,
+      String omServiceId, String omNodeId, String omNodeHostAddr) {
+    final Optional<String> bindHost = getHostNameFromConfigKeys(conf,
+        addKeySuffixes(OZONE_OM_HTTPS_BIND_HOST_KEY, omServiceId, omNodeId));
+
+    final Optional<Integer> addressPort = getPortNumberFromConfigKeys(conf,
+        addKeySuffixes(OZONE_OM_HTTPS_ADDRESS_KEY, omServiceId, omNodeId));
+
+    final Optional<String> addressHost = getHostNameFromConfigKeys(conf,
+        addKeySuffixes(OZONE_OM_HTTPS_ADDRESS_KEY, omServiceId, omNodeId));
+
+    String hostName = bindHost.orElse(addressHost.orElse(omNodeHostAddr));
+
+    return hostName + ":" +
+        addressPort.orElse(OZONE_OM_HTTPS_BIND_PORT_DEFAULT);
+  }
+
+  /**
+   * Get the local directory where ratis logs will be stored.
+   */
+  public static String getOMRatisDirectory(Configuration conf) {
+    String storageDir = conf.get(OMConfigKeys.OZONE_OM_RATIS_STORAGE_DIR);
+
+    if (Strings.isNullOrEmpty(storageDir)) {
+      storageDir = HddsServerUtil.getDefaultRatisDirectory(conf);
+    }
+    return storageDir;
+  }
+
+  public static String getOMRatisSnapshotDirectory(Configuration conf) {
+    String snapshotDir = conf.get(OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_DIR);
+
+    if (Strings.isNullOrEmpty(snapshotDir)) {
+      snapshotDir = Paths.get(getOMRatisDirectory(conf),
+          "snapshot").toString();
+    }
+    return snapshotDir;
+  }
+
+  public static File createOMDir(String dirPath) {
+    File dirFile = new File(dirPath);
+    if (!dirFile.exists() && !dirFile.mkdirs()) {
+      throw new IllegalArgumentException("Unable to create path: " + dirFile);
+    }
+    return dirFile;
+  }
 }

+ 21 - 0
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java

@@ -184,6 +184,27 @@ public final class OMConfigKeys {
       OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
       = TimeDuration.valueOf(15, TimeUnit.SECONDS);
 
+  // OM SnapshotProvider configurations
+  public static final String OZONE_OM_RATIS_SNAPSHOT_DIR =
+      "ozone.om.ratis.snapshot.dir";
+  public static final String OZONE_OM_SNAPSHOT_PROVIDER_SOCKET_TIMEOUT_KEY =
+      "ozone.om.snapshot.provider.socket.timeout";
+  public static final TimeDuration
+      OZONE_OM_SNAPSHOT_PROVIDER_SOCKET_TIMEOUT_DEFAULT =
+      TimeDuration.valueOf(5000, TimeUnit.MILLISECONDS);
+
+  public static final String OZONE_OM_SNAPSHOT_PROVIDER_CONNECTION_TIMEOUT_KEY =
+      "ozone.om.snapshot.provider.connection.timeout";
+  public static final TimeDuration
+      OZONE_OM_SNAPSHOT_PROVIDER_CONNECTION_TIMEOUT_DEFAULT =
+      TimeDuration.valueOf(5000, TimeUnit.MILLISECONDS);
+
+  public static final String OZONE_OM_SNAPSHOT_PROVIDER_REQUEST_TIMEOUT_KEY =
+      "ozone.om.snapshot.provider.request.timeout";
+  public static final TimeDuration
+      OZONE_OM_SNAPSHOT_PROVIDER_REQUEST_TIMEOUT_DEFAULT =
+      TimeDuration.valueOf(5000, TimeUnit.MILLISECONDS);
+
   public static final String OZONE_OM_KERBEROS_KEYTAB_FILE_KEY = "ozone.om."
       + "kerberos.keytab.file";
   public static final String OZONE_OM_KERBEROS_PRINCIPAL_KEY = "ozone.om"

+ 10 - 6
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java

@@ -200,6 +200,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
             // Set nodeId
             String nodeId = nodeIdBaseStr + i;
             conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
+            // Set the OM http(s) address to null so that the cluster picks
+            // up the address set with service ID and node ID in initHAConfig
+            conf.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, "");
+            conf.set(OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, "");
 
             // Set metadata/DB dir base path
             String metaDirPath = path + "/" + nodeId;
@@ -207,11 +211,6 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
             OMStorage omStore = new OMStorage(conf);
             initializeOmStorage(omStore);
 
-            // Set HTTP address to the rpc port + 2
-            int httpPort = basePort + (6*i) - 4;
-            conf.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY,
-                "127.0.0.1:" + httpPort);
-
             OzoneManager om = OzoneManager.createOm(null, conf);
             om.setCertClient(certClient);
             omMap.put(nodeId, om);
@@ -261,11 +260,16 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
         omNodesKeyValue.append(",").append(omNodeId);
         String omAddrKey = OmUtils.addKeySuffixes(
             OMConfigKeys.OZONE_OM_ADDRESS_KEY, omServiceId, omNodeId);
+        String omHttpAddrKey = OmUtils.addKeySuffixes(
+            OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, omServiceId, omNodeId);
+        String omHttpsAddrKey = OmUtils.addKeySuffixes(
+            OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, omServiceId, omNodeId);
         String omRatisPortKey = OmUtils.addKeySuffixes(
             OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, omServiceId, omNodeId);
 
         conf.set(omAddrKey, "127.0.0.1:" + port);
-        // Reserve port+2 for OMs HTTP server
+        conf.set(omHttpAddrKey, "127.0.0.1:" + (port + 2));
+        conf.set(omHttpsAddrKey, "127.0.0.1:" + (port + 3));
         conf.setInt(omRatisPortKey, port + 4);
       }
 

+ 125 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOzoneManagerSnapshotProvider.java

@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.utils.db.DBCheckpoint;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.UUID;
+
+/**
+ * Test OM's snapshot provider service.
+ */
+public class TestOzoneManagerSnapshotProvider {
+
+  private MiniOzoneHAClusterImpl cluster = null;
+  private ObjectStore objectStore;
+  private OzoneConfiguration conf;
+  private String clusterId;
+  private String scmId;
+  private int numOfOMs = 3;
+
+  @Rule
+  public Timeout timeout = new Timeout(300_000);
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   */
+  @Before
+  public void init() throws Exception {
+    conf = new OzoneConfiguration();
+    clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
+    conf.setBoolean(OMConfigKeys.OZONE_OM_HTTP_ENABLED_KEY, true);
+    cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
+        .setClusterId(clusterId)
+        .setScmId(scmId)
+        .setOMServiceId("om-service-test1")
+        .setNumOfOzoneManagers(numOfOMs)
+        .build();
+    cluster.waitForClusterToBeReady();
+    objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testDownloadCheckpoint() throws Exception {
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner(userName)
+        .setAdmin(adminName)
+        .build();
+
+    objectStore.createVolume(volumeName, createVolumeArgs);
+    OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+
+    retVolumeinfo.createBucket(bucketName);
+    OzoneBucket ozoneBucket = retVolumeinfo.getBucket(bucketName);
+
+    String leaderOMNodeId = objectStore.getClientProxy().getOMProxyProvider()
+        .getCurrentProxyOMNodeId();
+    OzoneManager ozoneManager = cluster.getOzoneManager(leaderOMNodeId);
+
+    // Get a follower OM
+    String followerNodeId = ozoneManager.getPeerNodes().get(0).getOMNodeId();
+    OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
+
+    // Download latest checkpoint from leader OM to follower OM
+    DBCheckpoint omSnapshot = followerOM.getOmSnapshotProvider()
+        .getOzoneManagerDBSnapshot(leaderOMNodeId);
+
+    long leaderSnapshotIndex = ozoneManager.loadRatisSnapshotIndex();
+    long downloadedSnapshotIndex = omSnapshot.getRatisSnapshotIndex();
+
+    // The snapshot index downloaded from leader OM should match the ratis
+    // snapshot index on the leader OM
+    Assert.assertEquals("The snapshot index downloaded from leader OM does " +
+        "not match its ratis snapshot index",
+        leaderSnapshotIndex, downloadedSnapshotIndex);
+  }
+}

+ 25 - 1
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.om;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_BEFORE_DB_CHECKPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_INDEX;
 import static org.apache.hadoop.ozone.OzoneConsts.
     OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
 
@@ -54,6 +56,7 @@ public class OMDBCheckpointServlet extends HttpServlet {
       LoggerFactory.getLogger(OMDBCheckpointServlet.class);
   private static final long serialVersionUID = 1L;
 
+  private transient OzoneManager om;
   private transient DBStore omDbStore;
   private transient OMMetrics omMetrics;
   private transient DataTransferThrottler throttler = null;
@@ -61,7 +64,7 @@ public class OMDBCheckpointServlet extends HttpServlet {
   @Override
   public void init() throws ServletException {
 
-    OzoneManager om = (OzoneManager) getServletContext()
+    om = (OzoneManager) getServletContext()
         .getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE);
 
     if (om == null) {
@@ -110,6 +113,24 @@ public class OMDBCheckpointServlet extends HttpServlet {
         flush = Boolean.valueOf(flushParam);
       }
 
+      boolean takeRatisSnapshot = false;
+      String snapshotBeforeCheckpointParam =
+          request.getParameter(OM_RATIS_SNAPSHOT_BEFORE_DB_CHECKPOINT);
+      if (StringUtils.isNotEmpty(snapshotBeforeCheckpointParam)) {
+        takeRatisSnapshot = Boolean.valueOf(snapshotBeforeCheckpointParam);
+      }
+
+      long ratisSnapshotIndex;
+      if (takeRatisSnapshot) {
+        // If OM follower is downloading the checkpoint, we should save a
+        // ratis snapshot first. This step also included flushing the OM DB.
+        // Hence, we can set flush to false.
+        flush = false;
+        ratisSnapshotIndex = om.saveRatisSnapshot();
+      } else {
+        ratisSnapshotIndex = om.loadRatisSnapshotIndex();
+      }
+
       DBCheckpoint checkpoint = omDbStore.getCheckpoint(flush);
       if (checkpoint == null) {
         LOG.error("Unable to process metadata snapshot request. " +
@@ -136,6 +157,9 @@ public class OMDBCheckpointServlet extends HttpServlet {
       response.setHeader("Content-Disposition",
           "attachment; filename=\"" +
               checkPointTarFile.getName() + "\"");
+      // Ratis snapshot index used when downloading DB checkpoint to OM follower
+      response.setHeader(OM_RATIS_SNAPSHOT_INDEX,
+          String.valueOf(ratisSnapshotIndex));
 
       checkpointFileInputStream = new FileInputStream(checkPointTarFile);
       start = Instant.now();

+ 39 - 2
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMNodeDetails.java

@@ -17,11 +17,16 @@
 
 package org.apache.hadoop.ozone.om;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.net.NetUtils;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_BEFORE_DB_CHECKPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
+
 /**
  * This class stores OM node details.
  */
@@ -31,17 +36,22 @@ public final class OMNodeDetails {
   private InetSocketAddress rpcAddress;
   private int rpcPort;
   private int ratisPort;
+  private String httpAddress;
+  private String httpsAddress;
 
   /**
    * Constructs OMNodeDetails object.
    */
   private OMNodeDetails(String serviceId, String nodeId,
-      InetSocketAddress rpcAddr, int rpcPort, int ratisPort) {
+      InetSocketAddress rpcAddr, int rpcPort, int ratisPort,
+      String httpAddress, String httpsAddress) {
     this.omServiceId = serviceId;
     this.omNodeId = nodeId;
     this.rpcAddress = rpcAddr;
     this.rpcPort = rpcPort;
     this.ratisPort = ratisPort;
+    this.httpAddress = httpAddress;
+    this.httpsAddress = httpsAddress;
   }
 
   /**
@@ -53,6 +63,8 @@ public final class OMNodeDetails {
     private InetSocketAddress rpcAddress;
     private int rpcPort;
     private int ratisPort;
+    private String httpAddr;
+    private String httpsAddr;
 
     public Builder setRpcAddress(InetSocketAddress rpcAddr) {
       this.rpcAddress = rpcAddr;
@@ -75,9 +87,19 @@ public final class OMNodeDetails {
       return this;
     }
 
+    public Builder setHttpAddress(String httpAddress) {
+      this.httpAddr = httpAddress;
+      return this;
+    }
+
+    public Builder setHttpsAddress(String httpsAddress) {
+      this.httpsAddr = httpsAddress;
+      return this;
+    }
+
     public OMNodeDetails build() {
       return new OMNodeDetails(omServiceId, omNodeId, rpcAddress, rpcPort,
-          ratisPort);
+          ratisPort, httpAddr, httpsAddr);
     }
   }
 
@@ -108,4 +130,19 @@ public final class OMNodeDetails {
   public String getRpcAddressString() {
     return NetUtils.getHostPortString(rpcAddress);
   }
+
+  public String getOMDBCheckpointEnpointUrl(HttpConfig.Policy httpPolicy) {
+    if (httpPolicy.isHttpEnabled()) {
+      if (StringUtils.isNotEmpty(httpAddress)) {
+        return "http://" + httpAddress + OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT
+            + "?" + OM_RATIS_SNAPSHOT_BEFORE_DB_CHECKPOINT + "=true";
+      }
+    } else {
+      if (StringUtils.isNotEmpty(httpsAddress)) {
+        return "https://" + httpsAddress + OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT
+            + "?" + OM_RATIS_SNAPSHOT_BEFORE_DB_CHECKPOINT + "=true";
+      }
+    }
+    return null;
+  }
 }

+ 71 - 17
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java

@@ -78,6 +78,7 @@ import org.apache.hadoop.ozone.om.helpers.OmDeleteVolumeResponse;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeOwnerChangeResponse;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
+import org.apache.hadoop.ozone.om.snapshot.OzoneManagerSnapshotProvider;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .KeyArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -184,22 +185,17 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_FILE;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_METRICS_TEMP_FILE;
-
 import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_INDEX;
 import static org.apache.hadoop.ozone.OzoneConsts.RPC_PORT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.om.OMConfigKeys
-    .OZONE_OM_HANDLER_COUNT_DEFAULT;
-import static org.apache.hadoop.ozone.om.OMConfigKeys
-    .OZONE_OM_HANDLER_COUNT_KEY;
-import static org.apache.hadoop.ozone.om.OMConfigKeys
-    .OZONE_OM_METRICS_SAVE_INTERVAL;
-import static org.apache.hadoop.ozone.om.OMConfigKeys
-    .OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
-
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODE_ID_KEY;
-import static org.apache.hadoop.ozone.om.OMConfigKeys
-    .OZONE_OM_RATIS_PORT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_PORT_KEY;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
@@ -208,10 +204,6 @@ import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKE
 import static org.apache.hadoop.ozone.protocol.proto
     .OzoneManagerProtocolProtos.OzoneManagerService
     .newReflectiveBlockingService;
-import static org.apache.hadoop.ozone.om.OMConfigKeys
-    .OZONE_OM_KERBEROS_KEYTAB_FILE_KEY;
-import static org.apache.hadoop.ozone.om.OMConfigKeys
-    .OZONE_OM_KERBEROS_PRINCIPAL_KEY;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
 /**
@@ -241,7 +233,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private RPC.Server omRpcServer;
   private InetSocketAddress omRpcAddress;
   private String omId;
-  private List<OMNodeDetails> peerNodes;
   private final OMMetadataManager metadataManager;
   private final VolumeManager volumeManager;
   private final BucketManager bucketManager;
@@ -273,7 +264,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private boolean isRatisEnabled;
   private OzoneManagerRatisServer omRatisServer;
   private OzoneManagerRatisClient omRatisClient;
+  private OzoneManagerSnapshotProvider omSnapshotProvider;
   private OMNodeDetails omNodeDetails;
+  private List<OMNodeDetails> peerNodes;
+  private File omRatisSnapshotDir;
   private final File ratisSnapshotFile;
   private long snapshotIndex;
 
@@ -319,6 +313,11 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     startRatisServer();
     startRatisClient();
 
+    if (peerNodes != null && !peerNodes.isEmpty()) {
+      this.omSnapshotProvider = new OzoneManagerSnapshotProvider(configuration,
+          omRatisSnapshotDir, peerNodes);
+    }
+
     this.ratisSnapshotFile = new File(omStorage.getCurrentDir(),
         OM_RATIS_SNAPSHOT_INDEX);
     this.snapshotIndex = loadRatisSnapshotIndex();
@@ -449,11 +448,17 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
           } else {
             // This OMNode belongs to same OM service as the current OMNode.
             // Add it to peerNodes list.
+            String httpAddr = OmUtils.getHttpAddressForOMPeerNode(conf,
+                serviceId, nodeId, addr.getHostName());
+            String httpsAddr = OmUtils.getHttpsAddressForOMPeerNode(conf,
+                serviceId, nodeId, addr.getHostName());
             OMNodeDetails peerNodeInfo = new OMNodeDetails.Builder()
                 .setOMServiceId(serviceId)
                 .setOMNodeId(nodeId)
                 .setRpcAddress(addr)
                 .setRatisPort(ratisPort)
+                .setHttpAddress(httpAddr)
+                .setHttpsAddress(httpsAddr)
                 .build();
             peerNodesList.add(peerNodeInfo);
           }
@@ -465,6 +470,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
         setOMNodeDetails(localOMServiceId, localOMNodeId, localRpcAddress,
             localRatisPort);
+
         this.peerNodes = peerNodesList;
 
         LOG.info("Found matching OM address with OMServiceId: {}, " +
@@ -530,6 +536,49 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     // Set this nodes OZONE_OM_ADDRESS_KEY to the discovered address.
     configuration.set(OZONE_OM_ADDRESS_KEY,
         NetUtils.getHostPortString(rpcAddress));
+
+    // Create Ratis storage dir
+    String omRatisDirectory = OmUtils.getOMRatisDirectory(configuration);
+    if (omRatisDirectory == null || omRatisDirectory.isEmpty()) {
+      throw new IllegalArgumentException(HddsConfigKeys.OZONE_METADATA_DIRS +
+          " must be defined.");
+    }
+    OmUtils.createOMDir(omRatisDirectory);
+
+    // Create Ratis snapshot dir
+    omRatisSnapshotDir = OmUtils.createOMDir(
+        OmUtils.getOMRatisSnapshotDirectory(configuration));
+
+    // Get and set Http(s) address of local node. If base config keys are
+    // not set, check for keys suffixed with OM serivce ID and node ID.
+    setOMNodeSpecificConfigs(serviceId, nodeId);
+  }
+
+  /**
+   * Check if any of the following configuration keys have been set using OM
+   * Node ID suffixed to the key. If yes, then set the base key with the
+   * configured valued.
+   *    1. {@link OMConfigKeys#OZONE_OM_HTTP_ADDRESS_KEY}
+   *    2. {@link OMConfigKeys#OZONE_OM_HTTPS_ADDRESS_KEY}
+   *    3. {@link OMConfigKeys#OZONE_OM_HTTP_BIND_HOST_KEY}
+   *    4. {@link OMConfigKeys#OZONE_OM_HTTPS_BIND_HOST_KEY}
+   */
+  private void setOMNodeSpecificConfigs(String omServiceId, String omNodeId) {
+    String[] confKeys = new String[] {
+        OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY,
+        OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY,
+        OMConfigKeys.OZONE_OM_HTTP_BIND_HOST_KEY,
+        OMConfigKeys.OZONE_OM_HTTPS_BIND_HOST_KEY};
+
+    for (String confKey : confKeys) {
+      String confValue = OmUtils.getConfSuffixedWithOMNodeId(
+          configuration, confKey, omServiceId, omNodeId);
+      if (confValue != null) {
+        LOG.info("Setting configuration key {} with value of key {}: {}",
+            confKey, OmUtils.addKeySuffixes(confKey, omNodeId), confValue);
+        configuration.set(confKey, confValue);
+      }
+    }
   }
 
   private KeyProviderCryptoExtension createKeyProviderExt(
@@ -1124,6 +1173,11 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     return omRatisServer;
   }
 
+  @VisibleForTesting
+  public OzoneManagerSnapshotProvider getOmSnapshotProvider() {
+    return omSnapshotProvider;
+  }
+
   @VisibleForTesting
   public InetSocketAddress getOmRpcServerAddr() {
     return omRpcAddress;

+ 7 - 2
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java

@@ -23,6 +23,9 @@ import org.apache.hadoop.hdds.server.BaseHttpServer;
 
 import java.io.IOException;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_SERVICE_LIST_HTTP_ENDPOINT;
+
 /**
  * HttpServer wrapper for the OzoneManager.
  */
@@ -31,8 +34,10 @@ public class OzoneManagerHttpServer extends BaseHttpServer {
   public OzoneManagerHttpServer(Configuration conf, OzoneManager om)
       throws IOException {
     super(conf, "ozoneManager");
-    addServlet("serviceList", "/serviceList", ServiceListJSONServlet.class);
-    addServlet("dbCheckpoint", "/dbCheckpoint", OMDBCheckpointServlet.class);
+    addServlet("serviceList", OZONE_OM_SERVICE_LIST_HTTP_ENDPOINT,
+        ServiceListJSONServlet.class);
+    addServlet("dbCheckpoint", OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT,
+        OMDBCheckpointServlet.class);
     getWebAppContext().setAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE, om);
   }
 

+ 2 - 15
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java

@@ -19,7 +19,6 @@
 package org.apache.hadoop.ozone.om.ratis;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -39,7 +38,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMNodeDetails;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -358,7 +357,7 @@ public final class OzoneManagerRatisServer {
     }
 
     // Set Ratis storage directory
-    String storageDir = getOMRatisDirectory(conf);
+    String storageDir = OmUtils.getOMRatisDirectory(conf);
     RaftServerConfigKeys.setStorageDirs(properties,
         Collections.singletonList(new File(storageDir)));
 
@@ -620,18 +619,6 @@ public final class OzoneManagerRatisServer {
     return this.raftPeerId;
   }
 
-  /**
-   * Get the local directory where ratis logs will be stored.
-   */
-  public static String getOMRatisDirectory(Configuration conf) {
-    String storageDir = conf.get(OMConfigKeys.OZONE_OM_RATIS_STORAGE_DIR);
-
-    if (Strings.isNullOrEmpty(storageDir)) {
-      storageDir = HddsServerUtil.getDefaultRatisDirectory(conf);
-    }
-    return storageDir;
-  }
-
   private UUID getRaftGroupIdFromOmServiceId(String omServiceId) {
     return UUID.nameUUIDFromBytes(omServiceId.getBytes(StandardCharsets.UTF_8));
   }

+ 210 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OzoneManagerSnapshotProvider.java

@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.ozone.om.OMNodeDetails;
+import org.apache.hadoop.utils.db.DBCheckpoint;
+import org.apache.hadoop.utils.db.RocksDBCheckpoint;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static java.net.HttpURLConnection.HTTP_CREATED;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_RATIS_SNAPSHOT_INDEX;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_PROVIDER_CONNECTION_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_PROVIDER_REQUEST_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_PROVIDER_REQUEST_TIMEOUT_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_PROVIDER_CONNECTION_TIMEOUT_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_PROVIDER_SOCKET_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_PROVIDER_SOCKET_TIMEOUT_KEY;
+
+/**
+ * OzoneManagerSnapshotProvider downloads the latest checkpoint from the
+ * leader OM and loads the checkpoint into State Machine.
+ */
+public class OzoneManagerSnapshotProvider {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneManagerSnapshotProvider.class);
+
+  private final File omSnapshotDir;
+  private Map<String, OMNodeDetails> peerNodesMap;
+  private final HttpConfig.Policy httpPolicy;
+  private final RequestConfig httpRequestConfig;
+  private CloseableHttpClient httpClient;
+
+  private static final String OM_SNAPSHOT_DB = "om.snapshot.db";
+
+  public OzoneManagerSnapshotProvider(Configuration conf,
+      File omRatisSnapshotDir, List<OMNodeDetails> peerNodes) {
+
+    LOG.info("Initializing OM Snapshot Provider");
+    this.omSnapshotDir = omRatisSnapshotDir;
+
+    this.peerNodesMap = new HashMap<>();
+    for (OMNodeDetails peerNode : peerNodes) {
+      this.peerNodesMap.put(peerNode.getOMNodeId(), peerNode);
+    }
+
+    this.httpPolicy = DFSUtil.getHttpPolicy(conf);
+    this.httpRequestConfig = getHttpRequestConfig(conf);
+  }
+
+  private RequestConfig getHttpRequestConfig(Configuration conf) {
+    TimeUnit socketTimeoutUnit =
+        OZONE_OM_SNAPSHOT_PROVIDER_SOCKET_TIMEOUT_DEFAULT.getUnit();
+    int socketTimeoutMS = (int) conf.getTimeDuration(
+        OZONE_OM_SNAPSHOT_PROVIDER_SOCKET_TIMEOUT_KEY,
+        OZONE_OM_SNAPSHOT_PROVIDER_SOCKET_TIMEOUT_DEFAULT.getDuration(),
+        socketTimeoutUnit);
+
+    TimeUnit connectionTimeoutUnit =
+        OZONE_OM_SNAPSHOT_PROVIDER_CONNECTION_TIMEOUT_DEFAULT.getUnit();
+    int connectionTimeoutMS = (int) conf.getTimeDuration(
+        OZONE_OM_SNAPSHOT_PROVIDER_CONNECTION_TIMEOUT_KEY,
+        OZONE_OM_SNAPSHOT_PROVIDER_CONNECTION_TIMEOUT_DEFAULT.getDuration(),
+        connectionTimeoutUnit);
+
+    TimeUnit requestTimeoutUnit =
+        OZONE_OM_SNAPSHOT_PROVIDER_REQUEST_TIMEOUT_DEFAULT.getUnit();
+    int requestTimeoutMS = (int) conf.getTimeDuration(
+        OZONE_OM_SNAPSHOT_PROVIDER_REQUEST_TIMEOUT_KEY,
+        OZONE_OM_SNAPSHOT_PROVIDER_REQUEST_TIMEOUT_DEFAULT.getDuration(),
+        requestTimeoutUnit);
+
+    RequestConfig requestConfig = RequestConfig.custom()
+        .setSocketTimeout(socketTimeoutMS)
+        .setConnectTimeout(connectionTimeoutMS)
+        .setConnectionRequestTimeout(requestTimeoutMS)
+        .build();
+
+    return requestConfig;
+  }
+
+  /**
+   * Create and return http client object.
+   */
+  private HttpClient getHttpClient() {
+    if (httpClient == null) {
+      httpClient = HttpClientBuilder
+          .create()
+          .setDefaultRequestConfig(httpRequestConfig)
+          .build();
+    }
+    return httpClient;
+  }
+
+  /**
+   * Close http client object.
+   */
+  private void closeHttpClient() throws IOException {
+    if (httpClient != null) {
+      httpClient.close();
+      httpClient = null;
+    }
+  }
+
+  /**
+   * Download the latest checkpoint from OM Leader via HTTP.
+   * @param leaderOMNodeID leader OM Node ID.
+   * @return the DB checkpoint (including the ratis snapshot index)
+   */
+  protected DBCheckpoint getOzoneManagerDBSnapshot(String leaderOMNodeID)
+      throws IOException {
+    String snapshotFileName = OM_SNAPSHOT_DB + "_" + System.currentTimeMillis();
+    File targetFile = new File(omSnapshotDir, snapshotFileName + ".tar.gz");
+
+    String omCheckpointUrl = peerNodesMap.get(leaderOMNodeID)
+        .getOMDBCheckpointEnpointUrl(httpPolicy);
+
+    LOG.info("Downloading latest checkpoint from Leader OM {}. Checkpoint " +
+        "URL: {}", leaderOMNodeID, omCheckpointUrl);
+
+    try {
+      HttpGet httpGet = new HttpGet(omCheckpointUrl);
+      HttpResponse response = getHttpClient().execute(httpGet);
+      int errorCode = response.getStatusLine().getStatusCode();
+      HttpEntity entity = response.getEntity();
+
+      if ((errorCode == HTTP_OK) || (errorCode == HTTP_CREATED)) {
+
+        Header header = response.getFirstHeader(OM_RATIS_SNAPSHOT_INDEX);
+        if (header == null) {
+          throw new IOException("The HTTP response header " +
+              OM_RATIS_SNAPSHOT_INDEX + " is missing.");
+        }
+
+        long snapshotIndex = Long.parseLong(header.getValue());
+
+        try (InputStream inputStream = entity.getContent()) {
+          FileUtils.copyInputStreamToFile(inputStream, targetFile);
+        }
+
+        // Untar the checkpoint file.
+        Path untarredDbDir = Paths.get(omSnapshotDir.getAbsolutePath(),
+            snapshotFileName);
+        FileUtil.unTar(targetFile, untarredDbDir.toFile());
+        FileUtils.deleteQuietly(targetFile);
+
+        LOG.info("Sucessfully downloaded latest checkpoint with snapshot " +
+            "index {} from leader OM: {}",  snapshotIndex, leaderOMNodeID);
+
+        RocksDBCheckpoint omCheckpoint = new RocksDBCheckpoint(untarredDbDir);
+        omCheckpoint.setRatisSnapshotIndex(snapshotIndex);
+        return omCheckpoint;
+      }
+
+      if (entity != null) {
+        throw new IOException("Unexpected exception when trying to reach " +
+            "OM to download latest checkpoint. Checkpoint URL: " +
+            omCheckpointUrl + ". Entity: " + EntityUtils.toString(entity));
+      } else {
+        throw new IOException("Unexpected null in http payload, while " +
+            "processing request to OM to download latest checkpoint. " +
+            "Checkpoint Url: " + omCheckpointUrl);
+      }
+    } finally {
+      closeHttpClient();
+    }
+  }
+}

+ 23 - 0
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/package-info.java

@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot;
+
+/**
+ * This package contains OM Ratis Snapshot related classes.
+ */

+ 3 - 3
hadoop-ozone/ozone-recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.recon.spi.impl;
 
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
 import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OM_SNAPSHOT_DB;
 import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
 import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_REQUEST_TIMEOUT;
@@ -69,7 +70,6 @@ public class OzoneManagerServiceProviderImpl
   private static final Logger LOG =
       LoggerFactory.getLogger(OzoneManagerServiceProviderImpl.class);
 
-  private final String dbCheckpointEndPoint = "/dbCheckpoint";
   private final CloseableHttpClient httpClient;
   private File omSnapshotDBParentDir = null;
   private String omDBSnapshotUrl;
@@ -116,11 +116,11 @@ public class OzoneManagerServiceProviderImpl
         .build();
 
     omDBSnapshotUrl = "http://" + ozoneManagerHttpAddress +
-        dbCheckpointEndPoint;
+        OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
 
     if (ozoneSecurityEnabled) {
       omDBSnapshotUrl = "https://" + ozoneManagerHttpsAddress +
-          dbCheckpointEndPoint;
+          OZONE_OM_DB_CHECKPOINT_HTTP_ENDPOINT;
     }
 
     boolean flushParam = configuration.getBoolean(