Browse Source

ARN-851. Share NMTokens using NMTokenCache (api-based) between AMRMClient and NMClient instead of memory based approach which is used currently. Contributed by Omkar Vinit Joshi.
svn merge --ignore-ancestry -c 1495247 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1495249 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 12 years ago
parent
commit
7c2206be10
25 changed files with 163 additions and 170 deletions
  1. 0 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
  2. 0 7
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
  3. 1 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
  4. 2 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
  5. 1 8
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
  6. 0 6
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
  7. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
  8. 0 7
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
  9. 4 0
      hadoop-yarn-project/CHANGES.txt
  10. 4 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
  11. 0 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
  12. 1 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
  13. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
  14. 0 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
  15. 4 19
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
  16. 103 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java
  17. 0 13
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
  18. 6 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
  19. 0 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
  20. 4 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
  21. 3 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
  22. 7 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
  23. 5 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
  24. 10 17
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
  25. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java

+ 0 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java

@@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
 import org.apache.hadoop.yarn.util.Clock;
@@ -62,6 +61,4 @@ public interface AppContext {
   Set<String> getBlacklistedNodes();
   
   ClientToAMTokenSecretManager getClientToAMTokenSecretManager();
-
-  Map<String, Token> getNMTokens();
 }

+ 0 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -886,8 +886,6 @@ public class MRAppMaster extends CompositeService {
     private final Configuration conf;
     private final ClusterInfo clusterInfo = new ClusterInfo();
     private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
-    private final ConcurrentHashMap<String, org.apache.hadoop.yarn.api.records.Token> nmTokens =
-        new ConcurrentHashMap<String, org.apache.hadoop.yarn.api.records.Token>();
 
     public RunningAppContext(Configuration config) {
       this.conf = config;
@@ -954,11 +952,6 @@ public class MRAppMaster extends CompositeService {
     public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
       return clientToAMTokenSecretManager;
     }
-    
-    @Override
-    public Map<String, org.apache.hadoop.yarn.api.records.Token> getNMTokens() {
-      return this.nmTokens;
-    }
   }
 
   @SuppressWarnings("unchecked")

+ 1 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java

@@ -235,8 +235,7 @@ public class ContainerLauncherImpl extends AbstractService implements
         MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
     LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
     super.serviceInit(conf);
-    cmProxy =
-        new ContainerManagementProtocolProxy(conf, context.getNMTokens());
+    cmProxy = new ContainerManagementProtocolProxy(conf);
   }
 
   protected void serviceStart() throws Exception {

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.RackResolver;
@@ -588,7 +589,7 @@ public class RMContainerAllocator extends RMContainerRequestor
     // Setting NMTokens
     if (response.getNMTokens() != null) {
       for (NMToken nmToken : response.getNMTokens()) {
-        getContext().getNMTokens().put(nmToken.getNodeId().toString(),
+        NMTokenCache.setNMToken(nmToken.getNodeId().toString(),
             nmToken.getToken());
       }
     }

+ 1 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java

@@ -26,10 +26,9 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
+import org.apache.hadoop.yarn.util.Clock;
 
 import com.google.common.collect.Maps;
 
@@ -131,10 +130,4 @@ public class MockAppContext implements AppContext {
     // Not implemented
     return null;
   }
-  
-  @Override
-  public Map<String, Token> getNMTokens() {
-    // Not Implemented
-    return null;
-  }
 }

+ 0 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java

@@ -862,11 +862,5 @@ public class TestRuntimeEstimators {
     public ClientToAMTokenSecretManager getClientToAMTokenSecretManager() {
       return null;
     }
-    
-    @Override
-    public Map<String, Token> getNMTokens() {
-      // Not Implemented
-      return null;
-    }
   }
 }

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java

@@ -376,7 +376,7 @@ public class TestContainerLauncher {
                 containerId.getApplicationAttemptId(),
                 NodeId.newInstance(addr.getHostName(), addr.getPort()), "user");
           ContainerManagementProtocolProxy cmProxy =
-              new ContainerManagementProtocolProxy(conf, context.getNMTokens());
+              new ContainerManagementProtocolProxy(conf);
           ContainerManagementProtocolProxyData proxy =
               cmProxy.new ContainerManagementProtocolProxyData(
                 YarnRPC.create(conf), containerManagerBindAddr, containerId,

+ 0 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java

@@ -44,7 +44,6 @@ import org.apache.hadoop.service.Service;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -316,10 +315,4 @@ public class JobHistory extends AbstractService implements HistoryContext {
     // Not implemented.
     return null;
   }
-  
-  @Override
-  public Map<String, Token> getNMTokens() {
-    // Not Implemented.
-    return null;
-  }
 }

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -163,6 +163,10 @@ Release 2.1.0-beta - UNRELEASED
     ApplicationSubmissionContext to simplify the api. (Karthik Kambatla via
     acmurthy) 
 
+    YARN-851. Share NMTokens using NMTokenCache (api-based) between AMRMClient
+    and NMClient instead of memory based approach which is used currently. (Omkar
+    Vinit Joshi via vinodkv)
+
   NEW FEATURES
 
     YARN-482. FS: Extend SchedulingMode to intermediate queues. 

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java

@@ -36,8 +36,9 @@ import org.apache.hadoop.yarn.util.Records;
  * 
  * <p>The response contains critical details such as:
  * <ul>
- *   <li>Minimum capability for allocated resources in the cluster.</li>
  *   <li>Maximum capability for allocated resources in the cluster.</li>
+ *   <li><code>ApplicationACL</code>s for the application.</li>
+ *   <li>ClientToAMToken master key.</li>
  * </ul>
  * </p>
  * 
@@ -50,11 +51,12 @@ public abstract class RegisterApplicationMasterResponse {
   @Unstable
   public static RegisterApplicationMasterResponse newInstance(
       Resource minCapability, Resource maxCapability,
-      Map<ApplicationAccessType, String> acls) {
+      Map<ApplicationAccessType, String> acls, ByteBuffer key) {
     RegisterApplicationMasterResponse response =
         Records.newRecord(RegisterApplicationMasterResponse.class);
     response.setMaximumResourceCapability(maxCapability);
     response.setApplicationACLs(acls);
+    response.setClientToAMTokenMasterKey(key);
     return response;
   }
 

+ 0 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java

@@ -44,12 +44,10 @@ import org.apache.hadoop.yarn.util.Records;
  *     <li>HTTP uri of the node.</li>
  *     <li>{@link Resource} allocated to the container.</li>
  *     <li>{@link Priority} at which the container was allocated.</li>
- *     <li>{@link ContainerState} of the container.</li>
  *     <li>
  *       Container {@link Token} of the container, used to securely verify
  *       authenticity of the allocation. 
  *     </li>
- *     <li>{@link ContainerStatus} of the container.</li>
  *   </ul>
  * </p>
  * 

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

@@ -448,8 +448,7 @@ public class ApplicationMaster {
     resourceManager.start();
 
     containerListener = new NMCallbackHandler();
-    nmClientAsync =
-        new NMClientAsyncImpl(containerListener, resourceManager.getNMTokens());
+    nmClientAsync = new NMClientAsyncImpl(containerListener);
     nmClientAsync.init(conf);
     nmClientAsync.start();
 

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

@@ -100,7 +100,7 @@ public class TestDistributedShell {
     }
   }
 
-  @Test(timeout=30000)
+  @Test(timeout=90000)
   public void testDSShell() throws Exception {
 
     String[] args = {
@@ -128,7 +128,7 @@ public class TestDistributedShell {
 
   }
 
-  @Test(timeout=30000)
+  @Test(timeout=90000)
   public void testDSShellWithInvalidArgs() throws Exception {
     Client client = new Client(new Configuration(yarnCluster.getConfig()));
 

+ 0 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.client.api;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -35,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
@@ -249,14 +247,4 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
                                            Priority priority, 
                                            String resourceName, 
                                            Resource capability);
-
-  /**
-   * It returns the NMToken received on allocate call. It will not communicate
-   * with RM to get NMTokens. On allocate call whenever we receive new token
-   * along with container AMRMClient will cache this NMToken per node manager.
-   * This map returned should be shared with any application which is
-   * communicating with NodeManager (ex. NMClient) using NMTokens. If a new
-   * NMToken is received for the same node manager then it will be replaced. 
-   */
-  public abstract ConcurrentMap<String, Token> getNMTokens();
 }

+ 4 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java

@@ -22,21 +22,17 @@ package org.apache.hadoop.yarn.client.api;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
@@ -46,30 +42,19 @@ public abstract class NMClient extends AbstractService {
 
   /**
    * Create a new instance of NMClient.
-   * @param nmTokens need to pass map of NMTokens which are received on
-   * {@link AMRMClient#allocate(float)} call as a part of
-   * {@link AllocateResponse}. 
-   * key :- NodeAddr (host:port)
-   * Value :- Token {@link NMToken#getToken()}
    */
   @Public
-  public static NMClient createNMClient(ConcurrentMap<String, Token> nmTokens) {
-    NMClient client = new NMClientImpl(nmTokens);
+  public static NMClient createNMClient() {
+    NMClient client = new NMClientImpl();
     return client;
   }
 
   /**
    * Create a new instance of NMClient.
-   * @param nmTokens need to pass map of NMTokens which are received on
-   * {@link AMRMClient#allocate(float)} call as a part of
-   * {@link AllocateResponse}. 
-   * key :- NodeAddr (host:port)
-   * Value :- Token {@link NMToken#getToken()}
    */
   @Public
-  public static NMClient createNMClient(String name,
-      ConcurrentMap<String, Token> nmTokens) {
-    NMClient client = new NMClientImpl(name, nmTokens);
+  public static NMClient createNMClient(String name) {
+    NMClient client = new NMClientImpl(name);
     return client;
   }
 

+ 103 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMTokenCache.java

@@ -0,0 +1,103 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.api.records.Token;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * It manages NMTokens required for communicating with Node manager. Its a
+ * static token cache.
+ */
+@Public
+@Evolving
+public class NMTokenCache {
+  private static ConcurrentHashMap<String, Token> nmTokens;
+  
+  
+  static {
+    nmTokens = new ConcurrentHashMap<String, Token>();
+  }
+  
+  /**
+   * Returns NMToken, null if absent
+   * @param nodeAddr
+   * @return {@link Token} NMToken required for communicating with node
+   * manager
+   */
+  @Public
+  @Evolving
+  public static Token getNMToken(String nodeAddr) {
+    return nmTokens.get(nodeAddr);
+  }
+  
+  /**
+   * Sets the NMToken for node address
+   * @param nodeAddr node address (host:port)
+   * @param token NMToken
+   */
+  @Public
+  @Evolving
+  public static void setNMToken(String nodeAddr, Token token) {
+    nmTokens.put(nodeAddr, token);
+  }
+  
+  /**
+   * Returns true if NMToken is present in cache.
+   */
+  @Private
+  @VisibleForTesting
+  public static boolean containsNMToken(String nodeAddr) {
+    return nmTokens.containsKey(nodeAddr);
+  }
+  
+  /**
+   * Returns the number of NMTokens present in cache.
+   */
+  @Private
+  @VisibleForTesting
+  public static int numberOfNMTokensInCache() {
+    return nmTokens.size();
+  }
+  
+  /**
+   * Removes NMToken for specified node manager
+   * @param nodeAddr node address (host:port)
+   */
+  @Private
+  @VisibleForTesting
+  public static void removeNMToken(String nodeAddr) {
+    nmTokens.remove(nodeAddr);
+  }
+  
+  /**
+   * It will remove all the nm tokens from its cache
+   */
+  @Private
+  @VisibleForTesting
+  public static void clearCache() {
+    nmTokens.clear();
+  }
+}

+ 0 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java

@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.client.api.async;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -37,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
@@ -198,17 +196,6 @@ extends AbstractService {
    */
   public abstract int getClusterNodeCount();
 
-  /**
-   * It returns the NMToken received on allocate call. It will not communicate
-   * with RM to get NMTokens. On allocate call whenever we receive new token
-   * along with new container AMRMClientAsync will cache this NMToken per node
-   * manager. This map returned should be shared with any application which is
-   * communicating with NodeManager (ex. NMClient / NMClientAsync) using
-   * NMTokens. If a new NMToken is received for the same node manager
-   * then it will be replaced. 
-   */
-  public abstract ConcurrentMap<String, Token> getNMTokens();
-  
   public interface CallbackHandler {
     
     /**

+ 6 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java

@@ -112,18 +112,16 @@ public abstract class NMClientAsync extends AbstractService {
   protected CallbackHandler callbackHandler;
 
   public static NMClientAsync createNMClientAsync(
-      CallbackHandler callbackHandler, ConcurrentMap<String, Token> nmTokens) {
-    return new NMClientAsyncImpl(callbackHandler, nmTokens);
+      CallbackHandler callbackHandler) {
+    return new NMClientAsyncImpl(callbackHandler);
   }
   
-  protected NMClientAsync(CallbackHandler callbackHandler,
-      ConcurrentMap<String, Token> nmTokens) {
-    this (NMClientAsync.class.getName(), callbackHandler, nmTokens);
+  protected NMClientAsync(CallbackHandler callbackHandler) {
+    this (NMClientAsync.class.getName(), callbackHandler);
   }
 
-  protected NMClientAsync(String name, CallbackHandler callbackHandler,
-      ConcurrentMap<String, Token> nmTokens) {
-    this (name, new NMClientImpl(nmTokens), callbackHandler);
+  protected NMClientAsync(String name, CallbackHandler callbackHandler) {
+    this (name, new NMClientImpl(), callbackHandler);
   }
 
   @Private

+ 0 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java

@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
@@ -40,7 +39,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -215,19 +213,6 @@ extends AMRMClientAsync<T> {
   public int getClusterNodeCount() {
     return client.getClusterNodeCount();
   }
-
-  /**
-   * It returns the NMToken received on allocate call. It will not communicate
-   * with RM to get NMTokens. On allocate call whenever we receive new token
-   * along with new container AMRMClientAsync will cache this NMToken per node
-   * manager. This map returned should be shared with any application which is
-   * communicating with NodeManager (ex. NMClient / NMClientAsync) using
-   * NMTokens. If a new NMToken is received for the same node manager
-   * then it will be replaced. 
-   */
-  public ConcurrentMap<String, Token> getNMTokens() {
-    return client.getNMTokens();
-  }
   
   private class HeartbeatThread extends Thread {
     public HeartbeatThread() {

+ 4 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java

@@ -82,14 +82,12 @@ public class NMClientAsyncImpl extends NMClientAsync {
   protected ConcurrentMap<ContainerId, StatefulContainer> containers =
       new ConcurrentHashMap<ContainerId, StatefulContainer>();
 
-  public NMClientAsyncImpl(CallbackHandler callbackHandler,
-      ConcurrentMap<String, Token> nmTokens) {
-    this(NMClientAsync.class.getName(), callbackHandler, nmTokens);
+  public NMClientAsyncImpl(CallbackHandler callbackHandler) {
+    this(NMClientAsync.class.getName(), callbackHandler);
   }
 
-  public NMClientAsyncImpl(String name, CallbackHandler callbackHandler,
-      ConcurrentMap<String, Token> nmTokens) {
-    this(name, new NMClientImpl(nmTokens), callbackHandler);
+  public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
+    this(name, new NMClientImpl(), callbackHandler);
   }
 
   @Private

+ 3 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java

@@ -34,7 +34,6 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -56,8 +55,8 @@ import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -82,7 +81,6 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       RecordFactoryProvider.getRecordFactory(null);
   
   private int lastResponseId = 0;
-  private ConcurrentHashMap<String, Token> nmTokens;
 
   protected ApplicationMasterProtocol rmClient;
   protected final ApplicationAttemptId appAttemptId;  
@@ -158,7 +156,6 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
     super(AMRMClientImpl.class.getName());
     this.appAttemptId = appAttemptId;
-    this.nmTokens = new ConcurrentHashMap<String, Token>();
   }
 
   @Override
@@ -285,12 +282,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   protected void populateNMTokens(AllocateResponse allocateResponse) {
     for (NMToken token : allocateResponse.getNMTokens()) {
       String nodeId = token.getNodeId().toString();
-      if (nmTokens.containsKey(nodeId)) {
+      if (NMTokenCache.containsNMToken(nodeId)) {
         LOG.debug("Replacing token for : " + nodeId);
       } else {
         LOG.debug("Received new token for : " + nodeId);
       }
-      nmTokens.put(nodeId, token.getToken());
+      NMTokenCache.setNMToken(nodeId, token.getToken());
     }
   }
 
@@ -577,9 +574,4 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
           + " #asks=" + ask.size());
     }
   }
-
-  @Override
-  public ConcurrentHashMap<String, Token> getNMTokens() {
-    return nmTokens;
-  }
 }

+ 7 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java

@@ -23,7 +23,6 @@ import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,6 +35,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -54,13 +54,10 @@ public class ContainerManagementProtocolProxy {
 
   private final int maxConnectedNMs;
   private final LinkedHashMap<String, ContainerManagementProtocolProxyData> cmProxy;
-  private Map<String, Token> nmTokens;
   private final Configuration conf;
   private final YarnRPC rpc;
   
-  public ContainerManagementProtocolProxy(Configuration conf,
-      Map<String, Token> nmTokens) {
-    this.nmTokens = nmTokens;
+  public ContainerManagementProtocolProxy(Configuration conf) {
     this.conf = conf;
 
     maxConnectedNMs =
@@ -86,10 +83,10 @@ public class ContainerManagementProtocolProxy {
     // This get call will update the map which is working as LRU cache.
     ContainerManagementProtocolProxyData proxy =
         cmProxy.get(containerManagerBindAddr);
-    
+
     while (proxy != null
         && !proxy.token.getIdentifier().equals(
-            nmTokens.get(containerManagerBindAddr).getIdentifier())) {
+            NMTokenCache.getNMToken(containerManagerBindAddr).getIdentifier())) {
       LOG.info("Refreshing proxy as NMToken got updated for node : "
           + containerManagerBindAddr);
       // Token is updated. check if anyone has already tried closing it.
@@ -112,7 +109,7 @@ public class ContainerManagementProtocolProxy {
     if (proxy == null) {
       proxy =
           new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr,
-              containerId, nmTokens.get(containerManagerBindAddr));
+              containerId, NMTokenCache.getNMToken(containerManagerBindAddr));
       if (cmProxy.size() > maxConnectedNMs) {
         // Number of existing proxy exceed the limit.
         String cmAddr = cmProxy.keySet().iterator().next();
@@ -172,10 +169,6 @@ public class ContainerManagementProtocolProxy {
     cmProxy.clear();
   }
   
-  public synchronized void setNMTokens(Map<String, Token> nmTokens) {
-    this.nmTokens = nmTokens;
-  }
-  
   public class ContainerManagementProtocolProxyData {
     private final String containerManagerBindAddr;
     private final ContainerManagementProtocol proxy;
@@ -201,10 +194,12 @@ public class ContainerManagementProtocolProxy {
     protected ContainerManagementProtocol newProxy(final YarnRPC rpc,
         String containerManagerBindAddr, ContainerId containerId, Token token)
         throws InvalidToken {
+
       if (token == null) {
         throw new InvalidToken("No NMToken sent for "
             + containerManagerBindAddr);
       }
+      
       final InetSocketAddress cmAddr =
           NetUtils.createSocketAddr(containerManagerBindAddr);
       LOG.info("Opening proxy : " + containerManagerBindAddr);

+ 5 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java

@@ -81,18 +81,15 @@ public class NMClientImpl extends NMClient {
       new ConcurrentHashMap<ContainerId, StartedContainer>();
 
   //enabled by default
- private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
- private ContainerManagementProtocolProxy cmProxy;
-  private ConcurrentMap<String, Token> nmTokens;
+  private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+  private ContainerManagementProtocolProxy cmProxy;
 
-  public NMClientImpl(ConcurrentMap<String, Token> nmTokens) {
+  public NMClientImpl() {
     super(NMClientImpl.class.getName());
-    this.nmTokens = nmTokens;
   }
 
-  public NMClientImpl(String name, ConcurrentMap<String, Token> nmTokens) {
+  public NMClientImpl(String name) {
     super(name);
-    this.nmTokens = nmTokens;
   }
 
   @Override
@@ -126,8 +123,7 @@ public class NMClientImpl extends NMClient {
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
-    cmProxy =
-        new ContainerManagementProtocolProxy(conf, nmTokens);
+    cmProxy = new ContainerManagementProtocolProxy(conf);
   }
   
   @Override

+ 10 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java

@@ -26,11 +26,9 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
 
 import junit.framework.Assert;
 
@@ -50,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -57,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest;
@@ -488,8 +488,8 @@ public class TestAMRMClient {
     int iterationsLeft = 2;
     Set<ContainerId> releases = new TreeSet<ContainerId>();
     
-    ConcurrentHashMap<String, Token> nmTokens = amClient.getNMTokens();
-    Assert.assertEquals(0, nmTokens.size());
+    NMTokenCache.clearCache();
+    Assert.assertEquals(0, NMTokenCache.numberOfNMTokensInCache());
     HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
     
     while (allocatedContainerCount < containersRequestedAny
@@ -505,19 +505,13 @@ public class TestAMRMClient {
         releases.add(rejectContainerId);
         amClient.releaseAssignedContainer(rejectContainerId);
       }
-      Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size());
-      Iterator<String> nodeI = nmTokens.keySet().iterator();
-      while (nodeI.hasNext()) {
-        String nodeId = nodeI.next();
-        if (!receivedNMTokens.containsKey(nodeId)) {
-          receivedNMTokens.put(nodeId, nmTokens.get(nodeId));
-        } else {
-          Assert.fail("Received token again for : " + nodeId);
+      
+      for (NMToken token : allocResponse.getNMTokens()) {
+        String nodeID = token.getNodeId().toString();
+        if (receivedNMTokens.containsKey(nodeID)) {
+          Assert.fail("Received token again for : " + nodeID);          
         }
-      }
-      nodeI = receivedNMTokens.keySet().iterator();
-      while (nodeI.hasNext()) {
-        nmTokens.remove(nodeI.next());
+        receivedNMTokens.put(nodeID, token.getToken());
       }
       
       if(allocatedContainerCount < containersRequestedAny) {
@@ -526,7 +520,6 @@ public class TestAMRMClient {
       }
     }
     
-    Assert.assertEquals(0, amClient.getNMTokens().size());
     // Should receive atleast 1 token
     Assert.assertTrue(receivedNMTokens.size() > 0
         && receivedNMTokens.size() <= nodeCount);

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java

@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -74,7 +75,6 @@ public class TestNMClient {
   List<NodeReport> nodeReports = null;
   ApplicationAttemptId attemptId = null;
   int nodeCount = 3;
-  ConcurrentHashMap<String, Token> nmTokens;
   
   @Before
   public void setup() throws YarnException, IOException {
@@ -136,7 +136,6 @@ public class TestNMClient {
     if (iterationsLeft == 0) {
       fail("Application hasn't bee started");
     }
-    nmTokens = new ConcurrentHashMap<String, Token>();
 
     // start am rm client
     rmClient =
@@ -148,7 +147,7 @@ public class TestNMClient {
     assertEquals(STATE.STARTED, rmClient.getServiceState());
 
     // start am nm client
-    nmClient = (NMClientImpl) NMClient.createNMClient(nmTokens);
+    nmClient = (NMClientImpl) NMClient.createNMClient();
     nmClient.init(conf);
     nmClient.start();
     assertNotNull(nmClient);
@@ -173,7 +172,7 @@ public class TestNMClient {
     nmClient.stop();
   }
 
-  @Test (timeout = 60000)
+  @Test (timeout = 180000)
   public void testNMClientNoCleanupOnStop()
       throws YarnException, IOException {
 
@@ -241,7 +240,8 @@ public class TestNMClient {
       }
       if (!allocResponse.getNMTokens().isEmpty()) {
         for (NMToken token : allocResponse.getNMTokens()) {
-          nmTokens.put(token.getNodeId().toString(), token.getToken());
+          NMTokenCache.setNMToken(token.getNodeId().toString(),
+              token.getToken());
         }
       }
       if(allocatedContainerCount < containersRequestedAny) {