Selaa lähdekoodia

HDFS-12868. Ozone: Service Discovery API. Contributed by Nanda Kumar.

Xiaoyu Yao 7 vuotta sitten
vanhempi
commit
75d8dd0e69

+ 0 - 4
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -124,10 +124,6 @@ public final class OzoneConfigKeys {
   public static final Class<? extends ClientProtocol>
       OZONE_CLIENT_PROTOCOL_REST = RestClient.class;
 
-  public static final String OZONE_REST_SERVERS = "ozone.rest.servers";
-  public static final String OZONE_REST_CLIENT_PORT = "ozone.rest.client.port";
-  public static final int OZONE_REST_CLIENT_PORT_DEFAULT = 9864;
-
   // This defines the overall connection limit for the connection pool used in
   // RestClient.
   public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX =

+ 196 - 70
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientFactory.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.client;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
@@ -34,6 +35,12 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_CLIENT_PROTOCOL_REST;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_CLIENT_PROTOCOL_RPC;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
+    .OZONE_KSM_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
+    .OZONE_KSM_HTTP_BIND_PORT_DEFAULT;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_PORT_DEFAULT;
 
 /**
  * Factory class to create different types of OzoneClients.
@@ -49,112 +56,240 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
  */
 public final class OzoneClientFactory {
 
-  private enum ClientType {
-    RPC, REST
-  }
+  private static final Logger LOG = LoggerFactory.getLogger(
+      OzoneClientFactory.class);
 
   /**
    * Private constructor, class is not meant to be initialized.
    */
   private OzoneClientFactory(){}
 
-  private static final Logger LOG = LoggerFactory.getLogger(
-      OzoneClientFactory.class);
-
-  private static Configuration configuration;
 
   /**
-   * Returns an OzoneClient which will use protocol defined through
-   * <code>ozone.client.protocol</code> to perform client operations.
+   * Constructs and return an OzoneClient with default configuration.
+   *
    * @return OzoneClient
+   *
    * @throws IOException
    */
   public static OzoneClient getClient() throws IOException {
-    return getClient(null);
+    LOG.info("Creating OzoneClient with default configuration.");
+    return getClient(new OzoneConfiguration());
   }
 
   /**
-   * Returns an OzoneClient which will use RPC protocol to perform
-   * client operations.
+   * Constructs and return an OzoneClient based on the configuration object.
+   * Protocol type is decided by <code>ozone.client.protocol</code>.
+   *
+   * @param config
+   *        Configuration to be used for OzoneClient creation
+   *
    * @return OzoneClient
+   *
    * @throws IOException
    */
-  public static OzoneClient getRpcClient() throws IOException {
-    return getClient(ClientType.RPC);
+  public static OzoneClient getClient(Configuration config)
+      throws IOException {
+    Preconditions.checkNotNull(config);
+    Class<? extends ClientProtocol> clazz = (Class<? extends ClientProtocol>)
+        config.getClass(OZONE_CLIENT_PROTOCOL, OZONE_CLIENT_PROTOCOL_RPC);
+    return getClient(getClientProtocol(clazz, config), config);
   }
 
   /**
-   * Returns an OzoneClient which will use REST protocol to perform
-   * client operations.
+   * Returns an OzoneClient which will use RPC protocol.
+   *
+   * @param ksmHost
+   *        hostname of KeySpaceManager to connect.
+   *
    * @return OzoneClient
+   *
    * @throws IOException
    */
-  public static OzoneClient getRestClient() throws IOException {
-    return getClient(ClientType.REST);
+  public static OzoneClient getRpcClient(String ksmHost)
+      throws IOException {
+    return getRpcClient(ksmHost, OZONE_KSM_PORT_DEFAULT,
+        new OzoneConfiguration());
   }
 
   /**
-   * Returns OzoneClient with protocol type set base on ClientType.
-   * @param clientType
+   * Returns an OzoneClient which will use RPC protocol.
+   *
+   * @param ksmHost
+   *        hostname of KeySpaceManager to connect.
+   *
+   * @param ksmRpcPort
+   *        RPC port of KeySpaceManager.
+   *
    * @return OzoneClient
+   *
    * @throws IOException
    */
-  private static OzoneClient getClient(ClientType clientType)
+  public static OzoneClient getRpcClient(String ksmHost, Integer ksmRpcPort)
       throws IOException {
-    OzoneClientInvocationHandler clientHandler =
-        new OzoneClientInvocationHandler(getProtocolClass(clientType));
-    ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
-        OzoneClientInvocationHandler.class.getClassLoader(),
-        new Class<?>[]{ClientProtocol.class}, clientHandler);
-    return new OzoneClient(configuration, proxy);
+    return getRpcClient(ksmHost, ksmRpcPort, new OzoneConfiguration());
   }
 
   /**
-   * Returns the configuration if it's already set, else creates a new
-   * {@link OzoneConfiguration} and returns it.
+   * Returns an OzoneClient which will use RPC protocol.
+   *
+   * @param ksmHost
+   *        hostname of KeySpaceManager to connect.
+   *
+   * @param ksmRpcPort
+   *        RPC port of KeySpaceManager.
+   *
+   * @param config
+   *        Configuration to be used for OzoneClient creation
    *
-   * @return Configuration
+   * @return OzoneClient
+   *
+   * @throws IOException
    */
-  private static synchronized Configuration getConfiguration() {
-    if(configuration == null) {
-      setConfiguration(new OzoneConfiguration());
-    }
-    return configuration;
+  public static OzoneClient getRpcClient(String ksmHost, Integer ksmRpcPort,
+                                         Configuration config)
+      throws IOException {
+    Preconditions.checkNotNull(ksmHost);
+    Preconditions.checkNotNull(ksmRpcPort);
+    Preconditions.checkNotNull(config);
+    config.set(OZONE_KSM_ADDRESS_KEY, ksmHost + ":" + ksmRpcPort);
+    return getRpcClient(config);
   }
 
   /**
-   * Based on the clientType, client protocol instance is created.
-   * If clientType is null, <code>ozone.client.protocol</code> property
-   * will be used to decide the protocol to be used.
-   * @param clientType type of client protocol to be created
-   * @return ClientProtocol implementation
+   * Returns an OzoneClient which will use RPC protocol.
+   *
+   * @param config
+   *        used for OzoneClient creation
+   *
+   * @return OzoneClient
+   *
    * @throws IOException
    */
-  private static ClientProtocol getProtocolClass(ClientType clientType)
+  public static OzoneClient getRpcClient(Configuration config)
+      throws IOException {
+    Preconditions.checkNotNull(config);
+    return getClient(getClientProtocol(OZONE_CLIENT_PROTOCOL_RPC, config),
+        config);
+  }
+
+  /**
+   * Returns an OzoneClient which will use REST protocol.
+   *
+   * @param ksmHost
+   *        hostname of KeySpaceManager to connect.
+   *
+   * @return OzoneClient
+   *
+   * @throws IOException
+   */
+  public static OzoneClient getRestClient(String ksmHost)
+      throws IOException {
+    return getRestClient(ksmHost, OZONE_KSM_HTTP_BIND_PORT_DEFAULT);
+  }
+
+  /**
+   * Returns an OzoneClient which will use REST protocol.
+   *
+   * @param ksmHost
+   *        hostname of KeySpaceManager to connect.
+   *
+   * @param ksmHttpPort
+   *        HTTP port of KeySpaceManager.
+   *
+   * @return OzoneClient
+   *
+   * @throws IOException
+   */
+  public static OzoneClient getRestClient(String ksmHost, Integer ksmHttpPort)
+      throws IOException {
+    return getRestClient(ksmHost, ksmHttpPort, new OzoneConfiguration());
+  }
+
+  /**
+   * Returns an OzoneClient which will use REST protocol.
+   *
+   * @param ksmHost
+   *        hostname of KeySpaceManager to connect.
+   *
+   * @param ksmHttpPort
+   *        HTTP port of KeySpaceManager.
+   *
+   * @param config
+   *        Configuration to be used for OzoneClient creation
+   *
+   * @return OzoneClient
+   *
+   * @throws IOException
+   */
+  public static OzoneClient getRestClient(String ksmHost, Integer ksmHttpPort,
+                                          Configuration config)
+      throws IOException {
+    Preconditions.checkNotNull(ksmHost);
+    Preconditions.checkNotNull(ksmHttpPort);
+    Preconditions.checkNotNull(config);
+    config.set(OZONE_KSM_HTTP_ADDRESS_KEY, ksmHost + ":" +  ksmHttpPort);
+    return getRestClient(config);
+  }
+
+  /**
+   * Returns an OzoneClient which will use REST protocol.
+   *
+   * @param config
+   *        Configuration to be used for OzoneClient creation
+   *
+   * @return OzoneClient
+   *
+   * @throws IOException
+   */
+  public static OzoneClient getRestClient(Configuration config)
+      throws IOException {
+    Preconditions.checkNotNull(config);
+    return getClient(getClientProtocol(OZONE_CLIENT_PROTOCOL_REST, config),
+        config);
+  }
+
+  /**
+   * Creates OzoneClient with the given ClientProtocol and Configuration.
+   *
+   * @param clientProtocol
+   *        Protocol to be used by the OzoneClient
+   *
+   * @param config
+   *        Configuration to be used for OzoneClient creation
+   */
+  private static OzoneClient getClient(ClientProtocol clientProtocol,
+                                       Configuration config) {
+    OzoneClientInvocationHandler clientHandler =
+        new OzoneClientInvocationHandler(clientProtocol);
+    ClientProtocol proxy = (ClientProtocol) Proxy.newProxyInstance(
+        OzoneClientInvocationHandler.class.getClassLoader(),
+        new Class<?>[]{ClientProtocol.class}, clientHandler);
+    return new OzoneClient(config, proxy);
+  }
+
+  /**
+   * Returns an instance of Protocol class.
+   *
+   * @param protocolClass
+   *        Class object of the ClientProtocol.
+   *
+   * @param config
+   *        Configuration used to initialize ClientProtocol.
+   *
+   * @return ClientProtocol
+   *
+   * @throws IOException
+   */
+  private static ClientProtocol getClientProtocol(
+      Class<? extends ClientProtocol> protocolClass, Configuration config)
       throws IOException {
-    Class<? extends ClientProtocol> protocolClass = null;
-    if(clientType != null) {
-      switch (clientType) {
-      case RPC:
-        protocolClass = OZONE_CLIENT_PROTOCOL_RPC;
-        break;
-      case REST:
-        protocolClass = OZONE_CLIENT_PROTOCOL_REST;
-        break;
-      default:
-        LOG.warn("Invalid ClientProtocol type, falling back to RPC.");
-        protocolClass = OZONE_CLIENT_PROTOCOL_RPC;
-        break;
-      }
-    } else {
-      protocolClass = (Class<ClientProtocol>)
-          getConfiguration().getClass(
-              OZONE_CLIENT_PROTOCOL, OZONE_CLIENT_PROTOCOL_RPC);
-    }
     try {
+      LOG.info("Using {} as client protocol.",
+          protocolClass.getCanonicalName());
       Constructor<? extends ClientProtocol> ctor =
           protocolClass.getConstructor(Configuration.class);
-      return ctor.newInstance(getConfiguration());
+      return ctor.newInstance(config);
     } catch (Exception e) {
       final String message = "Couldn't create protocol " + protocolClass;
       if (e.getCause() instanceof IOException) {
@@ -165,13 +300,4 @@ public final class OzoneClientFactory {
     }
   }
 
-  /**
-   * Sets the configuration, which will be used while creating OzoneClient.
-   *
-   * @param conf
-   */
-  public static void setConfiguration(Configuration conf) {
-    configuration = conf;
-  }
-
 }

+ 36 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/DefaultRestServerSelector.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rest;
+
+import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
+
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Default selector randomly picks one of the REST Server from the list.
+ */
+public class DefaultRestServerSelector implements RestServerSelector {
+
+  @Override
+  public ServiceInfo getRestServer(List<ServiceInfo> restServices) {
+    return restServices.get(
+        new Random().nextInt(restServices.size()));
+  }
+}

+ 56 - 26
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java

@@ -18,9 +18,12 @@
 
 package org.apache.hadoop.ozone.client.rest;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -36,14 +39,16 @@ import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
-
-import java.io.IOException;
 import org.apache.hadoop.ozone.client.rest.headers.Header;
 import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
 import org.apache.hadoop.ozone.client.rest.response.KeyInfo;
 import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
 import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
+import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.ServicePort;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
 import org.apache.http.HttpEntity;
@@ -64,19 +69,20 @@ import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.text.ParseException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static java.net.HttpURLConnection.HTTP_CREATED;
 import static java.net.HttpURLConnection.HTTP_OK;
@@ -108,13 +114,7 @@ public class RestClient implements ClientProtocol {
     try {
       Preconditions.checkNotNull(conf);
       this.conf = conf;
-      int port = conf.getInt(OzoneConfigKeys.OZONE_REST_CLIENT_PORT,
-          OzoneConfigKeys.OZONE_REST_CLIENT_PORT_DEFAULT);
-      URIBuilder uriBuilder = new URIBuilder()
-          .setScheme("http")
-          .setHost(getOzoneRestHandlerHost())
-          .setPort(port);
-      this.ozoneRestUri = uriBuilder.build();
+
       long socketTimeout = conf.getTimeDuration(
           OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT,
           OzoneConfigKeys.OZONE_CLIENT_SOCKET_TIMEOUT_DEFAULT,
@@ -129,7 +129,8 @@ public class RestClient implements ClientProtocol {
 
       int maxConnectionPerRoute = conf.getInt(
           OzoneConfigKeys.OZONE_REST_CLIENT_HTTP_CONNECTION_PER_ROUTE_MAX,
-          OzoneConfigKeys.OZONE_REST_CLIENT_HTTP_CONNECTION_PER_ROUTE_MAX_DEFAULT
+          OzoneConfigKeys
+              .OZONE_REST_CLIENT_HTTP_CONNECTION_PER_ROUTE_MAX_DEFAULT
       );
 
       /*
@@ -152,26 +153,55 @@ public class RestClient implements ClientProtocol {
       this.ugi = UserGroupInformation.getCurrentUser();
       this.userRights = conf.getEnum(KSMConfigKeys.OZONE_KSM_USER_RIGHTS,
           KSMConfigKeys.OZONE_KSM_USER_RIGHTS_DEFAULT);
+
+      // TODO: Add new configuration parameter to configure RestServerSelector.
+      RestServerSelector defaultSelector = new DefaultRestServerSelector();
+      InetSocketAddress restServer = getOzoneRestServerAddress(defaultSelector);
+      URIBuilder uriBuilder = new URIBuilder()
+          .setScheme("http")
+          .setHost(restServer.getHostName())
+          .setPort(restServer.getPort());
+      this.ozoneRestUri = uriBuilder.build();
+
     } catch (URISyntaxException e) {
       throw new IOException(e);
     }
   }
 
-  /**
-   * Returns the REST server host to connect to.
-   *
-   * @return hostname of REST server
-   */
-  private String getOzoneRestHandlerHost() {
-    List<String> servers = new ArrayList<>(conf.getTrimmedStringCollection(
-        OzoneConfigKeys.OZONE_REST_SERVERS));
-    if(servers.isEmpty()) {
-      throw new IllegalArgumentException(OzoneConfigKeys.OZONE_REST_SERVERS +
-          " must be defined. See" +
-          " https://wiki.apache.org/hadoop/Ozone#Configuration for" +
-          " details on configuring Ozone.");
+  private InetSocketAddress getOzoneRestServerAddress(
+      RestServerSelector selector) throws IOException {
+    String httpAddress = conf.get(KSMConfigKeys.OZONE_KSM_HTTP_ADDRESS_KEY);
+
+    if (httpAddress == null) {
+      throw new IllegalArgumentException(
+          KSMConfigKeys.OZONE_KSM_HTTP_ADDRESS_KEY + " must be defined. See" +
+              " https://wiki.apache.org/hadoop/Ozone#Configuration for" +
+              " details on configuring Ozone.");
+    }
+
+    HttpGet httpGet = new HttpGet("http://" + httpAddress + "/serviceList");
+    HttpEntity entity = executeHttpRequest(httpGet);
+    try {
+      String serviceListJson = EntityUtils.toString(entity);
+
+      ObjectMapper objectMapper = new ObjectMapper();
+      TypeReference<List<ServiceInfo>> serviceInfoReference =
+          new TypeReference<List<ServiceInfo>>() {
+          };
+      List<ServiceInfo> services = objectMapper.readValue(
+          serviceListJson, serviceInfoReference);
+
+      List<ServiceInfo> dataNodeInfos = services.stream().filter(
+          a -> a.getNodeType().equals(OzoneProtos.NodeType.DATANODE))
+          .collect(Collectors.toList());
+
+      ServiceInfo restServer = selector.getRestServer(dataNodeInfos);
+
+      return NetUtils.createSocketAddr(restServer.getHostname() + ":" +
+          restServer.getPort(ServicePort.Type.HTTP));
+    } finally {
+      EntityUtils.consume(entity);
     }
-    return servers.get(new Random().nextInt(servers.size()));
   }
 
   @Override

+ 40 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rest/RestServerSelector.java

@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rest;
+
+import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
+
+import java.util.List;
+
+/**
+ * The implementor of this interface should select the REST server which will
+ * be used by the client to connect to Ozone Cluster, given list of
+ * REST Servers/DataNodes (DataNodes are the ones which hosts REST Service).
+ */
+public interface RestServerSelector {
+
+  /**
+   * Returns the REST Service which will be used by the client for connection.
+   *
+   * @param restServices list of available REST servers
+   * @return ServiceInfo
+   */
+  ServiceInfo getRestServer(List<ServiceInfo> restServices);
+
+}

+ 13 - 2
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.ozone.ksm.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.ksm.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.ksm.protocolPB
     .KeySpaceManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.ksm.protocolPB
@@ -54,6 +55,8 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
+import org.apache.hadoop.ozone.protocol.proto
+    .KeySpaceManagerProtocolProtos.ServicePort;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
 import org.apache.hadoop.scm.ScmConfigKeys;
@@ -123,8 +126,7 @@ public class RpcClient implements ClientProtocol {
 
     long scmVersion =
         RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
-    InetSocketAddress scmAddress =
-        OzoneClientUtils.getScmAddressForClients(conf);
+    InetSocketAddress scmAddress = getScmAddressForClient();
     RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
         ProtobufRpcEngine.class);
     this.storageContainerLocationClient =
@@ -150,6 +152,15 @@ public class RpcClient implements ClientProtocol {
     }
   }
 
+  private InetSocketAddress getScmAddressForClient() throws IOException {
+    List<ServiceInfo> services = keySpaceManagerClient.getServiceList();
+    ServiceInfo scmInfo = services.stream().filter(
+        a -> a.getNodeType().equals(OzoneProtos.NodeType.SCM))
+        .collect(Collectors.toList()).get(0);
+    return NetUtils.createSocketAddr(scmInfo.getHostname()+ ":" +
+        scmInfo.getPort(ServicePort.Type.RPC));
+  }
+
   @Override
   public void createVolume(String volumeName) throws IOException {
     createVolume(volumeName, VolumeArgs.newBuilder().build());

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/tools/Corona.java

@@ -187,8 +187,7 @@ public final class Corona extends Configured implements Tool {
     numberOfVolumesCreated = new AtomicInteger();
     numberOfBucketsCreated = new AtomicInteger();
     numberOfKeysAdded = new AtomicLong();
-    OzoneClientFactory.setConfiguration(conf);
-    ozoneClient = OzoneClientFactory.getClient();
+    ozoneClient = OzoneClientFactory.getClient(conf);
     objectStore = ozoneClient.getObjectStore();
     for (CoronaOps ops : CoronaOps.values()) {
       histograms.add(ops.ordinal(), new Histogram(new UniformReservoir()));

+ 6 - 10
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rest/TestOzoneRestClient.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.client.rest;
 
 import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.ozone.MiniOzoneClassicCluster;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -46,6 +45,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -77,15 +77,11 @@ public class TestOzoneRestClient {
         OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
     cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(1)
         .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
-    DataNode datanode = cluster.getDataNodes().get(0);
-    conf.set(OzoneConfigKeys.OZONE_CLIENT_PROTOCOL,
-        "org.apache.hadoop.ozone.client.rest.RestClient");
-    conf.set(OzoneConfigKeys.OZONE_REST_SERVERS,
-        datanode.getDatanodeHostname());
-    conf.set(OzoneConfigKeys.OZONE_REST_CLIENT_PORT,
-        Integer.toString(datanode.getInfoPort()));
-    OzoneClientFactory.setConfiguration(conf);
-    ozClient = OzoneClientFactory.getClient();
+
+    InetSocketAddress ksmHttpAddress = cluster.getKeySpaceManager()
+        .getHttpServer().getHttpAddress();
+    ozClient = OzoneClientFactory.getRestClient(ksmHttpAddress.getHostName(),
+        ksmHttpAddress.getPort(), conf);
     store = ozClient.getObjectStore();
   }
 

+ 1 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java

@@ -93,10 +93,7 @@ public class TestOzoneRpcClient {
     conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1);
     cluster = new MiniOzoneClassicCluster.Builder(conf).numDataNodes(10)
         .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
-    conf.set("ozone.client.protocol",
-        "org.apache.hadoop.ozone.client.rpc.RpcClient");
-    OzoneClientFactory.setConfiguration(conf);
-    ozClient = OzoneClientFactory.getClient();
+    ozClient = OzoneClientFactory.getRpcClient(conf);
     store = ozClient.getObjectStore();
     storageContainerLocationClient =
         cluster.createStorageContainerLocationClient();

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java

@@ -59,8 +59,7 @@ public class TestCloseContainerHandler {
     cluster.waitOzoneReady();
 
     //the easiest way to create an open container is creating a key
-    OzoneClientFactory.setConfiguration(conf);
-    OzoneClient client = OzoneClientFactory.getClient();
+    OzoneClient client = OzoneClientFactory.getClient(conf);
     ObjectStore objectStore = client.getObjectStore();
     objectStore.createVolume("test");
     objectStore.getVolume("test").createBucket("test");