|
@@ -23,21 +23,139 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
|
|
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
|
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
|
|
import org.apache.hadoop.yarn.api.records.Token;
|
|
|
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
|
|
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
/**
|
|
|
- * It manages NMTokens required for communicating with Node manager. Its a
|
|
|
- * static token cache.
|
|
|
+ * NMTokenCache manages NMTokens required for an Application Master
|
|
|
+ * communicating with individual NodeManagers.
|
|
|
+ * <p/>
|
|
|
+ * By default Yarn client libraries {@link AMRMClient} and {@link NMClient} use
|
|
|
+ * {@link #getSingleton()} instance of the cache.
|
|
|
+ * <ul>
|
|
|
+ * <li>Using the singleton instance of the cache is appropriate when running a
|
|
|
+ * single ApplicationMaster in the same JVM.</li>
|
|
|
+ * <li>When using the singleton, users don't need to do anything special,
|
|
|
+ * {@link AMRMClient} and {@link NMClient} are already set up to use the default
|
|
|
+ * singleton {@link NMTokenCache}</li>
|
|
|
+ * </ul>
|
|
|
+ * <p/>
|
|
|
+ * If running multiple Application Masters in the same JVM, a different cache
|
|
|
+ * instance should be used for each Application Master.
|
|
|
+ * <p/>
|
|
|
+ * <ul>
|
|
|
+ * <li>
|
|
|
+ * If using the {@link AMRMClient} and the {@link NMClient}, setting up and using
|
|
|
+ * an instance cache is as follows:
|
|
|
+ * <p/>
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * NMTokenCache nmTokenCache = new NMTokenCache();
|
|
|
+ * AMRMClient rmClient = AMRMClient.createAMRMClient();
|
|
|
+ * NMClient nmClient = NMClient.createNMClient();
|
|
|
+ * nmClient.setNMTokenCache(nmTokenCache);
|
|
|
+ * ...
|
|
|
+ * </pre>
|
|
|
+ * </li>
|
|
|
+ * <li>
|
|
|
+ * If using the {@link AMRMClientAsync} and the {@link NMClientAsync}, setting up
|
|
|
+ * and using an instance cache is as follows:
|
|
|
+ * <p/>
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * NMTokenCache nmTokenCache = new NMTokenCache();
|
|
|
+ * AMRMClient rmClient = AMRMClient.createAMRMClient();
|
|
|
+ * NMClient nmClient = NMClient.createNMClient();
|
|
|
+ * nmClient.setNMTokenCache(nmTokenCache);
|
|
|
+ * AMRMClientAsync rmClientAsync = new AMRMClientAsync(rmClient, 1000, [AMRM_CALLBACK]);
|
|
|
+ * NMClientAsync nmClientAsync = new NMClientAsync("nmClient", nmClient, [NM_CALLBACK]);
|
|
|
+ * ...
|
|
|
+ * </pre>
|
|
|
+ * </li>
|
|
|
+ * <li>
|
|
|
+ * If using {@link ApplicationMasterProtocol} and
|
|
|
+ * {@link ContainerManagementProtocol} directly, setting up and using an
|
|
|
+ * instance cache is as follows:
|
|
|
+ * <p/>
|
|
|
+ *
|
|
|
+ * <pre>
|
|
|
+ * NMTokenCache nmTokenCache = new NMTokenCache();
|
|
|
+ * ...
|
|
|
+ * ApplicationMasterProtocol amPro = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
|
|
|
+ * ...
|
|
|
+ * AllocateRequest allocateRequest = ...
|
|
|
+ * ...
|
|
|
+ * AllocateResponse allocateResponse = rmClient.allocate(allocateRequest);
|
|
|
+ * for (NMToken token : allocateResponse.getNMTokens()) {
|
|
|
+ * nmTokenCache.setToken(token.getNodeId().toString(), token.getToken());
|
|
|
+ * }
|
|
|
+ * ...
|
|
|
+ * ContainerManagementProtocolProxy nmPro = ContainerManagementProtocolProxy(conf, nmTokenCache);
|
|
|
+ * ...
|
|
|
+ * nmPro.startContainer(container, containerContext);
|
|
|
+ * ...
|
|
|
+ * </pre>
|
|
|
+ * </li>
|
|
|
+ * </ul>
|
|
|
+ * It is also possible to mix the usage of a client (<code>AMRMClient</code> or
|
|
|
+ * <code>NMClient</code>, or the async versions of them) with a protocol proxy (
|
|
|
+ * <code>ContainerManagementProtocolProxy</code> or
|
|
|
+ * <code>ApplicationMasterProtocol</code>).
|
|
|
*/
|
|
|
@Public
|
|
|
@Evolving
|
|
|
public class NMTokenCache {
|
|
|
- private static ConcurrentHashMap<String, Token> nmTokens;
|
|
|
+ private static final NMTokenCache NM_TOKEN_CACHE = new NMTokenCache();
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns the singleton NM token cache.
|
|
|
+ *
|
|
|
+ * @return the singleton NM token cache.
|
|
|
+ */
|
|
|
+ public static NMTokenCache getSingleton() {
|
|
|
+ return NM_TOKEN_CACHE;
|
|
|
+ }
|
|
|
|
|
|
- static {
|
|
|
+ /**
|
|
|
+ * Returns NMToken, null if absent. Only the singleton obtained from
|
|
|
+ * {@link #getSingleton()} is looked at for the tokens. If you are using your
|
|
|
+ * own NMTokenCache that is different from the singleton, use
|
|
|
+ * {@link #getToken(String) }
|
|
|
+ *
|
|
|
+ * @param nodeAddr
|
|
|
+ * @return {@link Token} NMToken required for communicating with node manager
|
|
|
+ */
|
|
|
+ @Public
|
|
|
+ public static Token getNMToken(String nodeAddr) {
|
|
|
+ return NM_TOKEN_CACHE.getToken(nodeAddr);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets the NMToken for node address only in the singleton obtained from
|
|
|
+ * {@link #getSingleton()}. If you are using your own NMTokenCache that is
|
|
|
+ * different from the singleton, use {@link #setToken(String, Token) }
|
|
|
+ *
|
|
|
+ * @param nodeAddr
|
|
|
+ * node address (host:port)
|
|
|
+ * @param token
|
|
|
+ * NMToken
|
|
|
+ */
|
|
|
+ @Public
|
|
|
+ public static void setNMToken(String nodeAddr, Token token) {
|
|
|
+ NM_TOKEN_CACHE.setToken(nodeAddr, token);
|
|
|
+ }
|
|
|
+
|
|
|
+ private ConcurrentHashMap<String, Token> nmTokens;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Creates a NM token cache instance.
|
|
|
+ */
|
|
|
+ public NMTokenCache() {
|
|
|
nmTokens = new ConcurrentHashMap<String, Token>();
|
|
|
}
|
|
|
|
|
@@ -45,11 +163,11 @@ public class NMTokenCache {
|
|
|
* Returns NMToken, null if absent
|
|
|
* @param nodeAddr
|
|
|
* @return {@link Token} NMToken required for communicating with node
|
|
|
- * manager
|
|
|
+ * manager
|
|
|
*/
|
|
|
@Public
|
|
|
@Evolving
|
|
|
- public static Token getNMToken(String nodeAddr) {
|
|
|
+ public Token getToken(String nodeAddr) {
|
|
|
return nmTokens.get(nodeAddr);
|
|
|
}
|
|
|
|
|
@@ -60,7 +178,7 @@ public class NMTokenCache {
|
|
|
*/
|
|
|
@Public
|
|
|
@Evolving
|
|
|
- public static void setNMToken(String nodeAddr, Token token) {
|
|
|
+ public void setToken(String nodeAddr, Token token) {
|
|
|
nmTokens.put(nodeAddr, token);
|
|
|
}
|
|
|
|
|
@@ -69,7 +187,7 @@ public class NMTokenCache {
|
|
|
*/
|
|
|
@Private
|
|
|
@VisibleForTesting
|
|
|
- public static boolean containsNMToken(String nodeAddr) {
|
|
|
+ public boolean containsToken(String nodeAddr) {
|
|
|
return nmTokens.containsKey(nodeAddr);
|
|
|
}
|
|
|
|
|
@@ -78,7 +196,7 @@ public class NMTokenCache {
|
|
|
*/
|
|
|
@Private
|
|
|
@VisibleForTesting
|
|
|
- public static int numberOfNMTokensInCache() {
|
|
|
+ public int numberOfTokensInCache() {
|
|
|
return nmTokens.size();
|
|
|
}
|
|
|
|
|
@@ -88,7 +206,7 @@ public class NMTokenCache {
|
|
|
*/
|
|
|
@Private
|
|
|
@VisibleForTesting
|
|
|
- public static void removeNMToken(String nodeAddr) {
|
|
|
+ public void removeToken(String nodeAddr) {
|
|
|
nmTokens.remove(nodeAddr);
|
|
|
}
|
|
|
|
|
@@ -97,7 +215,7 @@ public class NMTokenCache {
|
|
|
*/
|
|
|
@Private
|
|
|
@VisibleForTesting
|
|
|
- public static void clearCache() {
|
|
|
+ public void clearCache() {
|
|
|
nmTokens.clear();
|
|
|
}
|
|
|
}
|