|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.service;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
@@ -53,8 +54,10 @@ import org.apache.hadoop.yarn.service.api.records.ComponentContainers;
|
|
|
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
|
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
|
|
|
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
|
|
|
+import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
|
|
|
import org.apache.hadoop.yarn.service.utils.FilterUtils;
|
|
|
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
|
|
+import org.apache.hadoop.yarn.service.utils.ServiceUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -64,6 +67,7 @@ import java.util.List;
|
|
|
|
|
|
import static org.apache.hadoop.yarn.service.component.ComponentEventType.DECOMMISSION_INSTANCE;
|
|
|
import static org.apache.hadoop.yarn.service.component.ComponentEventType.FLEX;
|
|
|
+import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_AM_CLIENT_PORT_RANGE;
|
|
|
|
|
|
public class ClientAMService extends AbstractService
|
|
|
implements ClientAMProtocol {
|
|
@@ -84,9 +88,11 @@ public class ClientAMService extends AbstractService
|
|
|
@Override protected void serviceStart() throws Exception {
|
|
|
Configuration conf = getConfig();
|
|
|
YarnRPC rpc = YarnRPC.create(conf);
|
|
|
- InetSocketAddress address = new InetSocketAddress(0);
|
|
|
+ String nodeHostString = getNMHostName();
|
|
|
+
|
|
|
+ InetSocketAddress address = new InetSocketAddress(nodeHostString, 0);
|
|
|
server = rpc.getServer(ClientAMProtocol.class, this, address, conf,
|
|
|
- context.secretManager, 1);
|
|
|
+ context.secretManager, 1, YARN_SERVICE_AM_CLIENT_PORT_RANGE);
|
|
|
|
|
|
// Enable service authorization?
|
|
|
if (conf.getBoolean(
|
|
@@ -97,9 +103,6 @@ public class ClientAMService extends AbstractService
|
|
|
|
|
|
server.start();
|
|
|
|
|
|
- String nodeHostString =
|
|
|
- System.getenv(ApplicationConstants.Environment.NM_HOST.name());
|
|
|
-
|
|
|
bindAddress = NetUtils.createSocketAddrForHost(nodeHostString,
|
|
|
server.getListenerAddress().getPort());
|
|
|
|
|
@@ -107,6 +110,12 @@ public class ClientAMService extends AbstractService
|
|
|
super.serviceStart();
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ String getNMHostName() throws BadClusterStateException {
|
|
|
+ return ServiceUtils.mandatoryEnvVariable(
|
|
|
+ ApplicationConstants.Environment.NM_HOST.name());
|
|
|
+ }
|
|
|
+
|
|
|
@Override protected void serviceStop() throws Exception {
|
|
|
if (server != null) {
|
|
|
server.stop();
|