|
@@ -20,14 +20,17 @@ package org.apache.hadoop.yarn.client.api.impl;
|
|
|
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
|
@@ -53,7 +56,7 @@ public class ContainerManagementProtocolProxy {
|
|
|
static final Log LOG = LogFactory.getLog(ContainerManagementProtocolProxy.class);
|
|
|
|
|
|
private final int maxConnectedNMs;
|
|
|
- private final LinkedHashMap<String, ContainerManagementProtocolProxyData> cmProxy;
|
|
|
+ private final Map<String, ContainerManagementProtocolProxyData> cmProxy;
|
|
|
private final Configuration conf;
|
|
|
private final YarnRPC rpc;
|
|
|
private NMTokenCache nmTokenCache;
|
|
@@ -70,16 +73,25 @@ public class ContainerManagementProtocolProxy {
|
|
|
maxConnectedNMs =
|
|
|
conf.getInt(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES,
|
|
|
YarnConfiguration.DEFAULT_NM_CLIENT_MAX_NM_PROXIES);
|
|
|
- if (maxConnectedNMs < 1) {
|
|
|
+ if (maxConnectedNMs < 0) {
|
|
|
throw new YarnRuntimeException(
|
|
|
YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES
|
|
|
- + " (" + maxConnectedNMs + ") can not be less than 1.");
|
|
|
+ + " (" + maxConnectedNMs + ") can not be less than 0.");
|
|
|
}
|
|
|
LOG.info(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES + " : "
|
|
|
+ maxConnectedNMs);
|
|
|
|
|
|
- cmProxy =
|
|
|
- new LinkedHashMap<String, ContainerManagementProtocolProxyData>();
|
|
|
+ if (maxConnectedNMs > 0) {
|
|
|
+ cmProxy =
|
|
|
+ new LinkedHashMap<String, ContainerManagementProtocolProxyData>();
|
|
|
+ } else {
|
|
|
+ cmProxy = Collections.emptyMap();
|
|
|
+ // Connections are not being cached so ensure connections close quickly
|
|
|
+ // to avoid creating thousands of RPC client threads on large clusters.
|
|
|
+ conf.setInt(
|
|
|
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
|
|
|
+ 0);
|
|
|
+ }
|
|
|
rpc = YarnRPC.create(conf);
|
|
|
}
|
|
|
|
|
@@ -117,13 +129,9 @@ public class ContainerManagementProtocolProxy {
|
|
|
proxy =
|
|
|
new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr,
|
|
|
containerId, nmTokenCache.getToken(containerManagerBindAddr));
|
|
|
- if (cmProxy.size() > maxConnectedNMs) {
|
|
|
- // Number of existing proxy exceed the limit.
|
|
|
- String cmAddr = cmProxy.keySet().iterator().next();
|
|
|
- removeProxy(cmProxy.get(cmAddr));
|
|
|
+ if (maxConnectedNMs > 0) {
|
|
|
+ addProxyToCache(containerManagerBindAddr, proxy);
|
|
|
}
|
|
|
-
|
|
|
- cmProxy.put(containerManagerBindAddr, proxy);
|
|
|
}
|
|
|
// This is to track active users of this proxy.
|
|
|
proxy.activeCallers++;
|
|
@@ -131,15 +139,52 @@ public class ContainerManagementProtocolProxy {
|
|
|
|
|
|
return proxy;
|
|
|
}
|
|
|
+
|
|
|
+ private void addProxyToCache(String containerManagerBindAddr,
|
|
|
+ ContainerManagementProtocolProxyData proxy) {
|
|
|
+ while (cmProxy.size() >= maxConnectedNMs) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Cleaning up the proxy cache, size=" + cmProxy.size()
|
|
|
+ + " max=" + maxConnectedNMs);
|
|
|
+ }
|
|
|
+ boolean removedProxy = false;
|
|
|
+ for (ContainerManagementProtocolProxyData otherProxy : cmProxy.values()) {
|
|
|
+ removedProxy = removeProxy(otherProxy);
|
|
|
+ if (removedProxy) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!removedProxy) {
|
|
|
+ // all of the proxies are currently in use and already scheduled
|
|
|
+ // for removal, so we need to wait until at least one of them closes
|
|
|
+ try {
|
|
|
+ this.wait();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (maxConnectedNMs > 0) {
|
|
|
+ cmProxy.put(containerManagerBindAddr, proxy);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
private void updateLRUCache(String containerManagerBindAddr) {
|
|
|
- ContainerManagementProtocolProxyData proxy =
|
|
|
- cmProxy.remove(containerManagerBindAddr);
|
|
|
- cmProxy.put(containerManagerBindAddr, proxy);
|
|
|
+ if (maxConnectedNMs > 0) {
|
|
|
+ ContainerManagementProtocolProxyData proxy =
|
|
|
+ cmProxy.remove(containerManagerBindAddr);
|
|
|
+ cmProxy.put(containerManagerBindAddr, proxy);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public synchronized void mayBeCloseProxy(
|
|
|
ContainerManagementProtocolProxyData proxy) {
|
|
|
+ tryCloseProxy(proxy);
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean tryCloseProxy(
|
|
|
+ ContainerManagementProtocolProxyData proxy) {
|
|
|
proxy.activeCallers--;
|
|
|
if (proxy.scheduledForClose && proxy.activeCallers < 0) {
|
|
|
LOG.info("Closing proxy : " + proxy.containerManagerBindAddr);
|
|
@@ -149,15 +194,18 @@ public class ContainerManagementProtocolProxy {
|
|
|
} finally {
|
|
|
this.notifyAll();
|
|
|
}
|
|
|
+ return true;
|
|
|
}
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
- private synchronized void removeProxy(
|
|
|
+ private synchronized boolean removeProxy(
|
|
|
ContainerManagementProtocolProxyData proxy) {
|
|
|
if (!proxy.scheduledForClose) {
|
|
|
proxy.scheduledForClose = true;
|
|
|
- mayBeCloseProxy(proxy);
|
|
|
+ return tryCloseProxy(proxy);
|
|
|
}
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
public synchronized void stopAllProxies() {
|