瀏覽代碼

YARN-2884. Added a proxy service in NM to proxy the the communication between AM and RM. Contributed by Kishore Chaliparambil

Jian He 10 年之前
父節點
當前提交
6f72f1e600
共有 19 個文件被更改,包括 3366 次插入91 次删除
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
  4. 34 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  5. 142 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java
  6. 70 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java
  7. 132 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java
  8. 592 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
  9. 265 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java
  10. 102 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java
  11. 138 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
  12. 71 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java
  13. 39 28
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  14. 677 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
  15. 65 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
  16. 469 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
  17. 58 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java
  18. 484 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
  19. 6 63
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -187,6 +187,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3970. Add REST api support for Application Priority.
     (Naganarasimha G R via vvasudev)
 
+    YARN-2884. Added a proxy service in NM to proxy the the communication
+    between AM and RM. (Kishore Chaliparambil via jianhe) 
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1332,6 +1332,23 @@ public class YarnConfiguration extends Configuration {
   public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX
       + "application.classpath";
 
+  public static final String AMRM_PROXY_ENABLED = NM_PREFIX
+      + "amrmproxy.enable";
+  public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;
+  public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
+      + "amrmproxy.address";
+  public static final int DEFAULT_AMRM_PROXY_PORT = 8048;
+  public static final String DEFAULT_AMRM_PROXY_ADDRESS = "0.0.0.0:"
+      + DEFAULT_AMRM_PROXY_PORT;
+  public static final String AMRM_PROXY_CLIENT_THREAD_COUNT = NM_PREFIX
+      + "amrmproxy.client.thread-count";
+  public static final int DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT = 25;
+  public static final String AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
+      NM_PREFIX + "amrmproxy.interceptor-class.pipeline";
+  public static final String DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE =
+      "org.apache.hadoop.yarn.server.nodemanager.amrmproxy."
+          + "DefaultRequestInterceptor";
+
   /**
    * Default platform-agnostic CLASSPATH for YARN applications. A
    * comma-separated list of CLASSPATH entries. The parameter expansion marker

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java

@@ -86,6 +86,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
         .add(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS);
     configurationPropsToSkipCompare
         .add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
 
     // Ignore all YARN Application Timeline Service (version 1) properties
     configurationPrefixToSkipCompare.add("yarn.timeline-service.");

+ 34 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -2259,4 +2259,38 @@
     <value></value>
   </property>
 
+  <property>
+    <description>
+    Enable/Disable AMRMProxyService in the node manager. This service is used to intercept
+    calls from the application masters to the resource manager.
+    </description>
+    <name>yarn.nodemanager.amrmproxy.enable</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+    The address of the AMRMProxyService listener.
+    </description>
+    <name>yarn.nodemanager.amrmproxy.address</name>
+    <value>0.0.0.0:8048</value>
+  </property>
+
+  <property>
+    <description>
+    The number of threads used to handle requests by the AMRMProxyService.
+    </description>
+    <name>yarn.nodemanager.amrmproxy.client.thread-count</name>
+    <value>25</value>
+  </property>
+
+  <property>
+    <description>
+    The comma separated list of class names that implement the RequestInterceptor interface. This is used by the
+    AMRMProxyService to create the request processing pipeline for applications.
+    </description>
+    <name>yarn.nodemanager.amrmproxy.interceptor-class.pipeline</name>
+    <value>org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor</value>
+  </property>
+
 </configuration>

+ 142 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerSecurityUtils.java

@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.utils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that contains commonly used server methods.
+ *
+ */
+@Private
+public final class YarnServerSecurityUtils {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(YarnServerSecurityUtils.class);
+
+  private YarnServerSecurityUtils() {
+  }
+
+  /**
+   * Authorizes the current request and returns the AMRMTokenIdentifier for the
+   * current application.
+   *
+   * @return the AMRMTokenIdentifier instance for the current user
+   * @throws YarnException
+   */
+  public static AMRMTokenIdentifier authorizeRequest()
+      throws YarnException {
+
+    UserGroupInformation remoteUgi;
+    try {
+      remoteUgi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      String msg =
+          "Cannot obtain the user-name for authorizing ApplicationMaster. "
+              + "Got exception: " + StringUtils.stringifyException(e);
+      LOG.warn(msg);
+      throw RPCUtil.getRemoteException(msg);
+    }
+
+    boolean tokenFound = false;
+    String message = "";
+    AMRMTokenIdentifier appTokenIdentifier = null;
+    try {
+      appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
+      if (appTokenIdentifier == null) {
+        tokenFound = false;
+        message = "No AMRMToken found for user " + remoteUgi.getUserName();
+      } else {
+        tokenFound = true;
+      }
+    } catch (IOException e) {
+      tokenFound = false;
+      message =
+          "Got exception while looking for AMRMToken for user "
+              + remoteUgi.getUserName();
+    }
+
+    if (!tokenFound) {
+      LOG.warn(message);
+      throw RPCUtil.getRemoteException(message);
+    }
+
+    return appTokenIdentifier;
+  }
+
+  // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
+  // currently sets only the required id, but iterate through anyways just to be
+  // sure.
+  private static AMRMTokenIdentifier selectAMRMTokenIdentifier(
+      UserGroupInformation remoteUgi) throws IOException {
+    AMRMTokenIdentifier result = null;
+    Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
+    for (TokenIdentifier tokenId : tokenIds) {
+      if (tokenId instanceof AMRMTokenIdentifier) {
+        result = (AMRMTokenIdentifier) tokenId;
+        break;
+      }
+    }
+
+    return result;
+  }
+
+  /**
+   * Parses the container launch context and returns a Credential instance that
+   * contains all the tokens from the launch context. 
+   * @param launchContext
+   * @return the credential instance
+   * @throws IOException
+   */
+  public static Credentials parseCredentials(
+      ContainerLaunchContext launchContext) throws IOException {
+    Credentials credentials = new Credentials();
+    ByteBuffer tokens = launchContext.getTokens();
+
+    if (tokens != null) {
+      DataInputByteBuffer buf = new DataInputByteBuffer();
+      tokens.rewind();
+      buf.reset(tokens);
+      credentials.readTokenStorageStream(buf);
+      if (LOG.isDebugEnabled()) {
+        for (Token<? extends TokenIdentifier> tk : credentials
+            .getAllTokens()) {
+          LOG.debug(tk.getService() + " = " + tk.toString());
+        }
+      }
+    }
+
+    return credentials;
+  }
+}

+ 70 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContext.java

@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+
+/**
+ * Interface that can be used by the intercepter plugins to get the information
+ * about one application.
+ *
+ */
+public interface AMRMProxyApplicationContext {
+
+  /**
+   * Gets the configuration object instance.
+   * @return the configuration object.
+   */
+  Configuration getConf();
+
+  /**
+   * Gets the application attempt identifier.
+   * @return the application attempt identifier.
+   */
+  ApplicationAttemptId getApplicationAttemptId();
+
+  /**
+   * Gets the application submitter.
+   * @return the application submitter
+   */
+  String getUser();
+
+  /**
+   * Gets the application's AMRMToken that is issued by the RM.
+   * @return the application's AMRMToken that is issued by the RM.
+   */
+  Token<AMRMTokenIdentifier> getAMRMToken();
+
+  /**
+   * Gets the application's local AMRMToken issued by the proxy service.
+   * @return the application's local AMRMToken issued by the proxy service.
+   */
+  Token<AMRMTokenIdentifier> getLocalAMRMToken();
+
+  /**
+   * Gets the NMContext object.
+   * @return the NMContext.
+   */
+  Context getNMCotext();
+
+}

+ 132 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java

@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+
+/**
+ * Encapsulates the information about one application that is needed by the
+ * request intercepters.
+ *
+ */
+public class AMRMProxyApplicationContextImpl implements
+    AMRMProxyApplicationContext {
+  private final Configuration conf;
+  private final Context nmContext;
+  private final ApplicationAttemptId applicationAttemptId;
+  private final String user;
+  private Integer localTokenKeyId;
+  private Token<AMRMTokenIdentifier> amrmToken;
+  private Token<AMRMTokenIdentifier> localToken;
+
+  /**
+   * Create an instance of the AMRMProxyApplicationContext.
+   * 
+   * @param nmContext
+   * @param conf
+   * @param applicationAttemptId
+   * @param user
+   * @param amrmToken
+   */
+  public AMRMProxyApplicationContextImpl(Context nmContext,
+      Configuration conf, ApplicationAttemptId applicationAttemptId,
+      String user, Token<AMRMTokenIdentifier> amrmToken,
+      Token<AMRMTokenIdentifier> localToken) {
+    this.nmContext = nmContext;
+    this.conf = conf;
+    this.applicationAttemptId = applicationAttemptId;
+    this.user = user;
+    this.amrmToken = amrmToken;
+    this.localToken = localToken;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return applicationAttemptId;
+  }
+
+  @Override
+  public String getUser() {
+    return user;
+  }
+
+  @Override
+  public synchronized Token<AMRMTokenIdentifier> getAMRMToken() {
+    return amrmToken;
+  }
+
+  /**
+   * Sets the application's AMRMToken.
+   */
+  public synchronized void setAMRMToken(
+      Token<AMRMTokenIdentifier> amrmToken) {
+    this.amrmToken = amrmToken;
+  }
+
+  @Override
+  public synchronized Token<AMRMTokenIdentifier> getLocalAMRMToken() {
+    return this.localToken;
+  }
+
+  /**
+   * Sets the application's AMRMToken.
+   */
+  public synchronized void setLocalAMRMToken(
+      Token<AMRMTokenIdentifier> localToken) {
+    this.localToken = localToken;
+    this.localTokenKeyId = null;
+  }
+
+  @Private
+  public synchronized int getLocalAMRMTokenKeyId() {
+    Integer keyId = this.localTokenKeyId;
+    if (keyId == null) {
+      try {
+        if (this.localToken == null) {
+          throw new YarnRuntimeException("Missing AMRM token for "
+              + this.applicationAttemptId);
+        }
+        keyId = this.amrmToken.decodeIdentifier().getKeyId();
+        this.localTokenKeyId = keyId;
+      } catch (IOException e) {
+        throw new YarnRuntimeException("AMRM token decode error for "
+            + this.applicationAttemptId, e);
+      }
+    }
+    return keyId;
+  }
+
+  @Override
+  public Context getNMCotext() {
+    return nmContext;
+  }
+}

+ 592 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java

@@ -0,0 +1,592 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * AMRMProxyService is a service that runs on each node manager that can be used
+ * to intercept and inspect messages from application master to the cluster
+ * resource manager. It listens to messages from the application master and
+ * creates a request intercepting pipeline instance for each application. The
+ * pipeline is a chain of intercepter instances that can inspect and modify the
+ * request/response as needed.
+ */
+public class AMRMProxyService extends AbstractService implements
+    ApplicationMasterProtocol {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(AMRMProxyService.class);
+  private Server server;
+  private final Context nmContext;
+  private final AsyncDispatcher dispatcher;
+  private InetSocketAddress listenerEndpoint;
+  private AMRMProxyTokenSecretManager secretManager;
+  private Map<ApplicationId, RequestInterceptorChainWrapper> applPipelineMap;
+
+  /**
+   * Creates an instance of the service.
+   * 
+   * @param nmContext
+   * @param dispatcher
+   */
+  public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) {
+    super(AMRMProxyService.class.getName());
+    Preconditions.checkArgument(nmContext != null, "nmContext is null");
+    Preconditions.checkArgument(dispatcher != null, "dispatcher is null");
+    this.nmContext = nmContext;
+    this.dispatcher = dispatcher;
+    this.applPipelineMap =
+        new ConcurrentHashMap<ApplicationId, RequestInterceptorChainWrapper>();
+
+    this.dispatcher.register(ApplicationEventType.class,
+        new ApplicationEventHandler());
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    LOG.info("Starting AMRMProxyService");
+    Configuration conf = getConfig();
+    YarnRPC rpc = YarnRPC.create(conf);
+    UserGroupInformation.setConfiguration(conf);
+
+    this.listenerEndpoint =
+        conf.getSocketAddr(YarnConfiguration.AMRM_PROXY_ADDRESS,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_PORT);
+
+    Configuration serverConf = new Configuration(conf);
+    serverConf.set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        SaslRpcServer.AuthMethod.TOKEN.toString());
+
+    int numWorkerThreads =
+        serverConf.getInt(
+            YarnConfiguration.AMRM_PROXY_CLIENT_THREAD_COUNT,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT);
+
+    this.secretManager = new AMRMProxyTokenSecretManager(serverConf);
+    this.secretManager.start();
+
+    this.server =
+        rpc.getServer(ApplicationMasterProtocol.class, this,
+            listenerEndpoint, serverConf, this.secretManager,
+            numWorkerThreads);
+
+    this.server.start();
+    LOG.info("AMRMProxyService listening on address: "
+        + this.server.getListenerAddress());
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Stopping AMRMProxyService");
+    if (this.server != null) {
+      this.server.stop();
+    }
+
+    this.secretManager.stop();
+
+    super.serviceStop();
+  }
+
+  /**
+   * This is called by the AMs started on this node to register with the RM.
+   * This method does the initial authorization and then forwards the request to
+   * the application instance specific intercepter chain.
+   */
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    LOG.info("Registering application master." + " Host:"
+        + request.getHost() + " Port:" + request.getRpcPort()
+        + " Tracking Url:" + request.getTrackingUrl());
+    RequestInterceptorChainWrapper pipeline =
+        authorizeAndGetInterceptorChain();
+    return pipeline.getRootInterceptor()
+        .registerApplicationMaster(request);
+  }
+
+  /**
+   * This is called by the AMs started on this node to unregister from the RM.
+   * This method does the initial authorization and then forwards the request to
+   * the application instance specific intercepter chain.
+   */
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    LOG.info("Finishing application master. Tracking Url:"
+        + request.getTrackingUrl());
+    RequestInterceptorChainWrapper pipeline =
+        authorizeAndGetInterceptorChain();
+    return pipeline.getRootInterceptor().finishApplicationMaster(request);
+  }
+
+  /**
+   * This is called by the AMs started on this node to send heart beat to RM.
+   * This method does the initial authorization and then forwards the request to
+   * the application instance specific pipeline, which is a chain of request
+   * intercepter objects. One application request processing pipeline is created
+   * per AM instance.
+   */
+  @Override
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnException, IOException {
+    AMRMTokenIdentifier amrmTokenIdentifier =
+        YarnServerSecurityUtils.authorizeRequest();
+    RequestInterceptorChainWrapper pipeline =
+        getInterceptorChain(amrmTokenIdentifier);
+    AllocateResponse allocateResponse =
+        pipeline.getRootInterceptor().allocate(request);
+
+    updateAMRMTokens(amrmTokenIdentifier, pipeline, allocateResponse);
+
+    return allocateResponse;
+  }
+
+  /**
+   * Callback from the ContainerManager implementation for initializing the
+   * application request processing pipeline.
+   *
+   * @param request - encapsulates information for starting an AM
+   * @throws IOException
+   * @throws YarnException
+   */
+  public void processApplicationStartRequest(StartContainerRequest request)
+      throws IOException, YarnException {
+    LOG.info("Callback received for initializing request "
+        + "processing pipeline for an AM");
+    ContainerTokenIdentifier containerTokenIdentifierForKey =
+        BuilderUtils.newContainerTokenIdentifier(request
+            .getContainerToken());
+    ApplicationAttemptId appAttemptId =
+        containerTokenIdentifierForKey.getContainerID()
+            .getApplicationAttemptId();
+    Credentials credentials =
+        YarnServerSecurityUtils.parseCredentials(request
+            .getContainerLaunchContext());
+
+    Token<AMRMTokenIdentifier> amrmToken =
+        getFirstAMRMToken(credentials.getAllTokens());
+    if (amrmToken == null) {
+      throw new YarnRuntimeException(
+          "AMRMToken not found in the start container request for application:"
+              + appAttemptId.toString());
+    }
+
+    // Substitute the existing AMRM Token with a local one. Keep the rest of the
+    // tokens in the credentials intact.
+    Token<AMRMTokenIdentifier> localToken =
+        this.secretManager.createAndGetAMRMToken(appAttemptId);
+    credentials.addToken(localToken.getService(), localToken);
+
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    request.getContainerLaunchContext().setTokens(
+        ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+
+    initializePipeline(containerTokenIdentifierForKey.getContainerID()
+        .getApplicationAttemptId(),
+        containerTokenIdentifierForKey.getApplicationSubmitter(),
+        amrmToken, localToken);
+  }
+
+  /**
+   * Initializes the request intercepter pipeline for the specified application.
+   * 
+   * @param applicationAttemptId
+   * @param user
+   * @param amrmToken
+   */
+  protected void initializePipeline(
+      ApplicationAttemptId applicationAttemptId, String user,
+      Token<AMRMTokenIdentifier> amrmToken,
+      Token<AMRMTokenIdentifier> localToken) {
+    RequestInterceptorChainWrapper chainWrapper = null;
+    synchronized (applPipelineMap) {
+      if (applPipelineMap.containsKey(applicationAttemptId.getApplicationId())) {
+        LOG.warn("Request to start an already existing appId was received. "
+            + " This can happen if an application failed and a new attempt "
+            + "was created on this machine.  ApplicationId: "
+            + applicationAttemptId.toString());
+        return;
+      }
+
+      chainWrapper = new RequestInterceptorChainWrapper();
+      this.applPipelineMap.put(applicationAttemptId.getApplicationId(),
+          chainWrapper);
+    }
+
+    // We register the pipeline instance in the map first and then initialize it
+    // later because chain initialization can be expensive and we would like to
+    // release the lock as soon as possible to prevent other applications from
+    // blocking when one application's chain is initializing
+    LOG.info("Initializing request processing pipeline for application. "
+        + " ApplicationId:" + applicationAttemptId + " for the user: "
+        + user);
+
+    RequestInterceptor interceptorChain =
+        this.createRequestInterceptorChain();
+    interceptorChain.init(createApplicationMasterContext(
+        applicationAttemptId, user, amrmToken, localToken));
+    chainWrapper.init(interceptorChain, applicationAttemptId);
+  }
+
+  /**
+   * Shuts down the request processing pipeline for the specified application
+   * attempt id.
+   *
+   * @param applicationId
+   */
+  protected void stopApplication(ApplicationId applicationId) {
+    Preconditions.checkArgument(applicationId != null,
+        "applicationId is null");
+    RequestInterceptorChainWrapper pipeline =
+        this.applPipelineMap.remove(applicationId);
+
+    if (pipeline == null) {
+      LOG.info("Request to stop an application that does not exist. Id:"
+          + applicationId);
+    } else {
+      LOG.info("Stopping the request processing pipeline for application: "
+          + applicationId);
+      try {
+        pipeline.getRootInterceptor().shutdown();
+      } catch (Throwable ex) {
+        LOG.warn(
+            "Failed to shutdown the request processing pipeline for app:"
+                + applicationId, ex);
+      }
+    }
+  }
+
+  private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier,
+      RequestInterceptorChainWrapper pipeline,
+      AllocateResponse allocateResponse) {
+    AMRMProxyApplicationContextImpl context =
+        (AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor()
+            .getApplicationContext();
+
+    // check to see if the RM has issued a new AMRMToken & accordingly update
+    // the real ARMRMToken in the current context
+    if (allocateResponse.getAMRMToken() != null) {
+      org.apache.hadoop.yarn.api.records.Token token =
+          allocateResponse.getAMRMToken();
+
+      org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newTokenId =
+          new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
+              token.getIdentifier().array(), token.getPassword().array(),
+              new Text(token.getKind()), new Text(token.getService()));
+
+      context.setAMRMToken(newTokenId);
+    }
+
+    // Check if the local AMRMToken is rolled up and update the context and
+    // response accordingly
+    MasterKeyData nextMasterKey =
+        this.secretManager.getNextMasterKeyData();
+
+    if (nextMasterKey != null
+        && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
+            .getKeyId()) {
+      Token<AMRMTokenIdentifier> localToken = context.getLocalAMRMToken();
+      if (nextMasterKey.getMasterKey().getKeyId() != context
+          .getLocalAMRMTokenKeyId()) {
+        LOG.info("The local AMRMToken has been rolled-over."
+            + " Send new local AMRMToken back to application: "
+            + pipeline.getApplicationId());
+        localToken =
+            this.secretManager.createAndGetAMRMToken(pipeline
+                .getApplicationAttemptId());
+        context.setLocalAMRMToken(localToken);
+      }
+
+      allocateResponse
+          .setAMRMToken(org.apache.hadoop.yarn.api.records.Token
+              .newInstance(localToken.getIdentifier(), localToken
+                  .getKind().toString(), localToken.getPassword(),
+                  localToken.getService().toString()));
+    }
+  }
+
+  private AMRMProxyApplicationContext createApplicationMasterContext(
+      ApplicationAttemptId applicationAttemptId, String user,
+      Token<AMRMTokenIdentifier> amrmToken,
+      Token<AMRMTokenIdentifier> localToken) {
+    AMRMProxyApplicationContextImpl appContext =
+        new AMRMProxyApplicationContextImpl(this.nmContext, getConfig(),
+            applicationAttemptId, user, amrmToken, localToken);
+    return appContext;
+  }
+
+  /**
+   * Gets the Request intercepter chains for all the applications.
+   * 
+   * @return the request intercepter chains.
+   */
+  protected Map<ApplicationId, RequestInterceptorChainWrapper> getPipelines() {
+    return this.applPipelineMap;
+  }
+
+  /**
+   * This method creates and returns reference of the first intercepter in the
+   * chain of request intercepter instances.
+   *
+   * @return the reference of the first intercepter in the chain
+   */
+  protected RequestInterceptor createRequestInterceptorChain() {
+    Configuration conf = getConfig();
+
+    List<String> interceptorClassNames = getInterceptorClassNames(conf);
+
+    RequestInterceptor pipeline = null;
+    RequestInterceptor current = null;
+    for (String interceptorClassName : interceptorClassNames) {
+      try {
+        Class<?> interceptorClass =
+            conf.getClassByName(interceptorClassName);
+        if (RequestInterceptor.class.isAssignableFrom(interceptorClass)) {
+          RequestInterceptor interceptorInstance =
+              (RequestInterceptor) ReflectionUtils.newInstance(
+                  interceptorClass, conf);
+          if (pipeline == null) {
+            pipeline = interceptorInstance;
+            current = interceptorInstance;
+            continue;
+          } else {
+            current.setNextInterceptor(interceptorInstance);
+            current = interceptorInstance;
+          }
+        } else {
+          throw new YarnRuntimeException("Class: " + interceptorClassName
+              + " not instance of "
+              + RequestInterceptor.class.getCanonicalName());
+        }
+      } catch (ClassNotFoundException e) {
+        throw new YarnRuntimeException(
+            "Could not instantiate ApplicationMasterRequestInterceptor: "
+                + interceptorClassName, e);
+      }
+    }
+
+    if (pipeline == null) {
+      throw new YarnRuntimeException(
+          "RequestInterceptor pipeline is not configured in the system");
+    }
+    return pipeline;
+  }
+
+  /**
+   * Returns the comma separated intercepter class names from the configuration.
+   *
+   * @param conf
+   * @return the intercepter class names as an instance of ArrayList
+   */
+  private List<String> getInterceptorClassNames(Configuration conf) {
+    String configuredInterceptorClassNames =
+        conf.get(
+            YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
+
+    List<String> interceptorClassNames = new ArrayList<String>();
+    Collection<String> tempList =
+        StringUtils.getStringCollection(configuredInterceptorClassNames);
+    for (String item : tempList) {
+      interceptorClassNames.add(item.trim());
+    }
+
+    return interceptorClassNames;
+  }
+
+  /**
+   * Authorizes the request and returns the application specific request
+   * processing pipeline.
+   *
+   * @return the the intercepter wrapper instance
+   * @throws YarnException
+   */
+  private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain()
+      throws YarnException {
+    AMRMTokenIdentifier tokenIdentifier =
+        YarnServerSecurityUtils.authorizeRequest();
+    return getInterceptorChain(tokenIdentifier);
+  }
+
+  private RequestInterceptorChainWrapper getInterceptorChain(
+      AMRMTokenIdentifier tokenIdentifier) throws YarnException {
+    ApplicationAttemptId appAttemptId =
+        tokenIdentifier.getApplicationAttemptId();
+
+    synchronized (this.applPipelineMap) {
+      if (!this.applPipelineMap.containsKey(appAttemptId
+          .getApplicationId())) {
+        throw new YarnException(
+            "The AM request processing pipeline is not initialized for app: "
+                + appAttemptId.getApplicationId().toString());
+      }
+
+      return this.applPipelineMap.get(appAttemptId.getApplicationId());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private Token<AMRMTokenIdentifier> getFirstAMRMToken(
+      Collection<Token<? extends TokenIdentifier>> allTokens) {
+    Iterator<Token<? extends TokenIdentifier>> iter = allTokens.iterator();
+    while (iter.hasNext()) {
+      Token<? extends TokenIdentifier> token = iter.next();
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        return (Token<AMRMTokenIdentifier>) token;
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Private class for handling application stop events.
+   *
+   */
+  class ApplicationEventHandler implements EventHandler<ApplicationEvent> {
+
+    @Override
+    public void handle(ApplicationEvent event) {
+      Application app =
+          AMRMProxyService.this.nmContext.getApplications().get(
+              event.getApplicationID());
+      if (app != null) {
+        switch (event.getType()) {
+        case FINISH_APPLICATION:
+          LOG.info("Application stop event received for stopping AppId:"
+              + event.getApplicationID().toString());
+          AMRMProxyService.this.stopApplication(event.getApplicationID());
+          break;
+        default:
+          LOG.debug("AMRMProxy is ignoring event: " + event.getType());
+          break;
+        }
+      } else {
+        LOG.warn("Event " + event + " sent to absent application "
+            + event.getApplicationID());
+      }
+    }
+  }
+
+  /**
+   * Private structure for encapsulating RequestInterceptor and
+   * ApplicationAttemptId instances.
+   *
+   */
+  private static class RequestInterceptorChainWrapper {
+    private RequestInterceptor rootInterceptor;
+    private ApplicationAttemptId applicationAttemptId;
+
+    /**
+     * Initializes the wrapper with the specified parameters.
+     * 
+     * @param rootInterceptor
+     * @param applicationAttemptId
+     */
+    public synchronized void init(RequestInterceptor rootInterceptor,
+        ApplicationAttemptId applicationAttemptId) {
+      this.rootInterceptor = rootInterceptor;
+      this.applicationAttemptId = applicationAttemptId;
+    }
+
+    /**
+     * Gets the root request intercepter.
+     * 
+     * @return the root request intercepter
+     */
+    public synchronized RequestInterceptor getRootInterceptor() {
+      return rootInterceptor;
+    }
+
+    /**
+     * Gets the application attempt identifier.
+     * 
+     * @return the application attempt identifier
+     */
+    public synchronized ApplicationAttemptId getApplicationAttemptId() {
+      return applicationAttemptId;
+    }
+
+    /**
+     * Gets the application identifier.
+     * 
+     * @return the application identifier
+     */
+    public synchronized ApplicationId getApplicationId() {
+      return applicationAttemptId.getApplicationId();
+    }
+  }
+}

+ 265 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java

@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This secret manager instance is used by the AMRMProxyService to generate and
+ * manage tokens.
+ */
+public class AMRMProxyTokenSecretManager extends
+    SecretManager<AMRMTokenIdentifier> {
+
+  private static final Log LOG = LogFactory
+      .getLog(AMRMProxyTokenSecretManager.class);
+
+  private int serialNo = new SecureRandom().nextInt();
+  private MasterKeyData nextMasterKey;
+  private MasterKeyData currentMasterKey;
+
+  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+  private final Lock readLock = readWriteLock.readLock();
+  private final Lock writeLock = readWriteLock.writeLock();
+
+  private final Timer timer;
+  private final long rollingInterval;
+  private final long activationDelay;
+
+  private final Set<ApplicationAttemptId> appAttemptSet =
+      new HashSet<ApplicationAttemptId>();
+
+  /**
+   * Create an {@link AMRMProxyTokenSecretManager}.
+   */
+  public AMRMProxyTokenSecretManager(Configuration conf) {
+    this.timer = new Timer();
+    this.rollingInterval =
+        conf.getLong(
+            YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+            YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
+    // Adding delay = 1.5 * expiry interval makes sure that all active AMs get
+    // the updated shared-key.
+    this.activationDelay =
+        (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
+    LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
+        + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay
+        + " ms");
+    if (rollingInterval <= activationDelay * 2) {
+      throw new IllegalArgumentException(
+          YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
+              + " should be more than 3 X "
+              + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
+    }
+  }
+
+  public void start() {
+    if (this.currentMasterKey == null) {
+      this.currentMasterKey = createNewMasterKey();
+    }
+    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
+        rollingInterval);
+  }
+
+  public void stop() {
+    this.timer.cancel();
+  }
+
+  public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Application finished, removing password for "
+          + appAttemptId);
+      this.appAttemptSet.remove(appAttemptId);
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  private class MasterKeyRoller extends TimerTask {
+    @Override
+    public void run() {
+      rollMasterKey();
+    }
+  }
+
+  @Private
+  void rollMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Rolling master-key for amrm-tokens");
+      this.nextMasterKey = createNewMasterKey();
+      this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  private class NextKeyActivator extends TimerTask {
+    @Override
+    public void run() {
+      activateNextMasterKey();
+    }
+  }
+
+  public void activateNextMasterKey() {
+    this.writeLock.lock();
+    try {
+      LOG.info("Activating next master key with id: "
+          + this.nextMasterKey.getMasterKey().getKeyId());
+      this.currentMasterKey = this.nextMasterKey;
+      this.nextMasterKey = null;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  @Private
+  @VisibleForTesting
+  public MasterKeyData createNewMasterKey() {
+    this.writeLock.lock();
+    try {
+      return new MasterKeyData(serialNo++, generateSecret());
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
+      ApplicationAttemptId appAttemptId) {
+    this.writeLock.lock();
+    try {
+      LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
+      AMRMTokenIdentifier identifier =
+          new AMRMTokenIdentifier(appAttemptId, getMasterKey()
+              .getMasterKey().getKeyId());
+      byte[] password = this.createPassword(identifier);
+      appAttemptSet.add(appAttemptId);
+      return new Token<AMRMTokenIdentifier>(identifier.getBytes(),
+          password, identifier.getKind(), new Text());
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  // If nextMasterKey is not Null, then return nextMasterKey
+  // otherwise return currentMasterKey.
+  @VisibleForTesting
+  public MasterKeyData getMasterKey() {
+    this.readLock.lock();
+    try {
+      return nextMasterKey == null ? currentMasterKey : nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  /**
+   * Retrieve the password for the given {@link AMRMTokenIdentifier}. Used by
+   * RPC layer to validate a remote {@link AMRMTokenIdentifier}.
+   */
+  @Override
+  public byte[] retrievePassword(AMRMTokenIdentifier identifier)
+      throws InvalidToken {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to retrieve password for "
+            + applicationAttemptId);
+      }
+      if (!appAttemptSet.contains(applicationAttemptId)) {
+        throw new InvalidToken(applicationAttemptId
+            + " not found in AMRMProxyTokenSecretManager.");
+      }
+      if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
+          .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+            this.currentMasterKey.getSecretKey());
+      } else if (nextMasterKey != null
+          && identifier.getKeyId() == this.nextMasterKey.getMasterKey()
+              .getKeyId()) {
+        return createPassword(identifier.getBytes(),
+            this.nextMasterKey.getSecretKey());
+      }
+      throw new InvalidToken("Invalid AMRMToken from "
+          + applicationAttemptId);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  /**
+   * Creates an empty TokenId to be used for de-serializing an
+   * {@link AMRMTokenIdentifier} by the RPC layer.
+   */
+  @Override
+  public AMRMTokenIdentifier createIdentifier() {
+    return new AMRMTokenIdentifier();
+  }
+
+  @Private
+  @VisibleForTesting
+  public MasterKeyData getNextMasterKeyData() {
+    this.readLock.lock();
+    try {
+      return this.nextMasterKey;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
+  @Private
+  protected byte[] createPassword(AMRMTokenIdentifier identifier) {
+    this.readLock.lock();
+    try {
+      ApplicationAttemptId applicationAttemptId =
+          identifier.getApplicationAttemptId();
+      LOG.info("Creating password for " + applicationAttemptId);
+      return createPassword(identifier.getBytes(), getMasterKey()
+          .getSecretKey());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+}

+ 102 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java

@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implements the RequestInterceptor interface and provides common functionality
+ * which can can be used and/or extended by other concrete intercepter classes.
+ *
+ */
+public abstract class AbstractRequestInterceptor implements
+    RequestInterceptor {
+  private Configuration conf;
+  private AMRMProxyApplicationContext appContext;
+  private RequestInterceptor nextInterceptor;
+
+  /**
+   * Sets the {@link RequestInterceptor} in the chain.
+   */
+  @Override
+  public void setNextInterceptor(RequestInterceptor nextInterceptor) {
+    this.nextInterceptor = nextInterceptor;
+  }
+
+  /**
+   * Sets the {@link Configuration}.
+   */
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    if (this.nextInterceptor != null) {
+      this.nextInterceptor.setConf(conf);
+    }
+  }
+
+  /**
+   * Gets the {@link Configuration}.
+   */
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  /**
+   * Initializes the {@link RequestInterceptor}.
+   */
+  @Override
+  public void init(AMRMProxyApplicationContext appContext) {
+    Preconditions.checkState(this.appContext == null,
+        "init is called multiple times on this interceptor: "
+            + this.getClass().getName());
+    this.appContext = appContext;
+    if (this.nextInterceptor != null) {
+      this.nextInterceptor.init(appContext);
+    }
+  }
+
+  /**
+   * Disposes the {@link RequestInterceptor}.
+   */
+  @Override
+  public void shutdown() {
+    if (this.nextInterceptor != null) {
+      this.nextInterceptor.shutdown();
+    }
+  }
+
+  /**
+   * Gets the next {@link RequestInterceptor} in the chain.
+   */
+  @Override
+  public RequestInterceptor getNextInterceptor() {
+    return this.nextInterceptor;
+  }
+
+  /**
+   * Gets the {@link AMRMProxyApplicationContext}.
+   */
+  public AMRMProxyApplicationContext getApplicationContext() {
+    return this.appContext;
+  }
+}

+ 138 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java

@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the AbstractRequestInterceptor class and provides an implementation
+ * that simply forwards the AM requests to the cluster resource manager.
+ *
+ */
+public final class DefaultRequestInterceptor extends
+    AbstractRequestInterceptor {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(DefaultRequestInterceptor.class);
+  private ApplicationMasterProtocol rmClient;
+  private UserGroupInformation user = null;
+
+  @Override
+  public void init(AMRMProxyApplicationContext appContext) {
+    super.init(appContext);
+    try {
+      user =
+          UserGroupInformation.createProxyUser(appContext
+              .getApplicationAttemptId().toString(), UserGroupInformation
+              .getCurrentUser());
+      user.addToken(appContext.getAMRMToken());
+      final Configuration conf = this.getConf();
+
+      rmClient =
+          user.doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
+            @Override
+            public ApplicationMasterProtocol run() throws Exception {
+              return ClientRMProxy.createRMProxy(conf,
+                  ApplicationMasterProtocol.class);
+            }
+          });
+    } catch (IOException e) {
+      String message =
+          "Error while creating of RM app master service proxy for attemptId:"
+              + appContext.getApplicationAttemptId().toString();
+      if (user != null) {
+        message += ", user: " + user;
+      }
+
+      LOG.info(message);
+      throw new YarnRuntimeException(message, e);
+    } catch (Exception e) {
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      final RegisterApplicationMasterRequest request)
+      throws YarnException, IOException {
+    LOG.info("Forwarding registration request to the real YARN RM");
+    return rmClient.registerApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(final AllocateRequest request)
+      throws YarnException, IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Forwarding allocate request to the real YARN RM");
+    }
+    AllocateResponse allocateResponse = rmClient.allocate(request);
+    if (allocateResponse.getAMRMToken() != null) {
+      updateAMRMToken(allocateResponse.getAMRMToken());
+    }
+
+    return allocateResponse;
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      final FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    LOG.info("Forwarding finish application request to "
+        + "the real YARN Resource Manager");
+    return rmClient.finishApplicationMaster(request);
+  }
+
+  @Override
+  public void setNextInterceptor(RequestInterceptor next) {
+    throw new YarnRuntimeException(
+        "setNextInterceptor is being called on DefaultRequestInterceptor,"
+            + "which should be the last one in the chain "
+            + "Check if the interceptor pipeline configuration is correct");
+  }
+
+  private void updateAMRMToken(Token token) throws IOException {
+    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
+        new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
+            token.getIdentifier().array(), token.getPassword().array(),
+            new Text(token.getKind()), new Text(token.getService()));
+    // Preserve the token service sent by the RM when adding the token
+    // to ensure we replace the previous token setup by the RM.
+    // Afterwards we can update the service address for the RPC layer.
+    user.addToken(amrmToken);
+    amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConf()));
+  }
+}

+ 71 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java

@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+
+/**
+ * Defines the contract to be implemented by the request intercepter classes,
+ * that can be used to intercept and inspect messages sent from the application
+ * master to the resource manager.
+ */
+public interface RequestInterceptor extends ApplicationMasterProtocol,
+    Configurable {
+  /**
+   * This method is called for initializing the intercepter. This is guaranteed
+   * to be called only once in the lifetime of this instance.
+   *
+   * @param ctx
+   */
+  void init(AMRMProxyApplicationContext ctx);
+
+  /**
+   * This method is called to release the resources held by the intercepter.
+   * This will be called when the application pipeline is being destroyed. The
+   * concrete implementations should dispose the resources and forward the
+   * request to the next intercepter, if any.
+   */
+  void shutdown();
+
+  /**
+   * Sets the next intercepter in the pipeline. The concrete implementation of
+   * this interface should always pass the request to the nextInterceptor after
+   * inspecting the message. The last intercepter in the chain is responsible to
+   * send the messages to the resource manager service and so the last
+   * intercepter will not receive this method call.
+   *
+   * @param nextInterceptor
+   */
+  void setNextInterceptor(RequestInterceptor nextInterceptor);
+
+  /**
+   * Returns the next intercepter in the chain.
+   * 
+   * @return the next intercepter in the chain
+   */
+  RequestInterceptor getNextInterceptor();
+
+  /**
+   * Returns the context.
+   * 
+   * @return the context
+   */
+  AMRMProxyApplicationContext getApplicationContext();
+}

+ 39 - 28
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -42,7 +42,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
@@ -51,7 +50,6 @@ import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.service.Service;
@@ -92,6 +90,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
@@ -103,6 +102,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -135,6 +135,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
@@ -172,6 +173,8 @@ public class ContainerManagerImpl extends CompositeService implements
   private boolean serviceStopped = false;
   private final ReadLock readLock;
   private final WriteLock writeLock;
+  private AMRMProxyService amrmProxyService;
+  private boolean amrmProxyEnabled = false;
 
   private long waitForContainersOnShutdownMillis;
 
@@ -235,6 +238,20 @@ public class ContainerManagerImpl extends CompositeService implements
     addService(sharedCacheUploader);
     dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);
 
+    amrmProxyEnabled =
+        conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
+            YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+
+    if (amrmProxyEnabled) {
+      LOG.info("AMRMProxyService is enabled. "
+          + "All the AM->RM requests will be intercepted by the proxy");
+      this.amrmProxyService =
+          new AMRMProxyService(this.context, this.dispatcher);
+      addService(this.amrmProxyService);
+    } else {
+      LOG.info("AMRMProxyService is disabled");
+    }
+
     waitForContainersOnShutdownMillis =
         conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
             YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
@@ -246,6 +263,10 @@ public class ContainerManagerImpl extends CompositeService implements
     recover();
   }
 
+  public boolean isARMRMProxyEnabled() {
+    return amrmProxyEnabled;
+  }
+
   @SuppressWarnings("unchecked")
   private void recover() throws IOException, URISyntaxException {
     NMStateStoreService stateStore = context.getNMStateStore();
@@ -314,7 +335,8 @@ public class ContainerManagerImpl extends CompositeService implements
         + " with exit code " + rcs.getExitCode());
 
     if (context.getApplications().containsKey(appId)) {
-      Credentials credentials = parseCredentials(launchContext);
+      Credentials credentials =
+          YarnServerSecurityUtils.parseCredentials(launchContext);
       Container container = new ContainerImpl(getConfig(), dispatcher,
           context.getNMStateStore(), req.getContainerLaunchContext(),
           credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
@@ -737,8 +759,17 @@ public class ContainerManagerImpl extends CompositeService implements
         verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
           containerTokenIdentifier);
         containerId = containerTokenIdentifier.getContainerID();
-        startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
-          request);
+
+        // Initialize the AMRMProxy service instance only if the container is of
+        // type AM and if the AMRMProxy service is enabled
+        if (isARMRMProxyEnabled()
+            && containerTokenIdentifier.getContainerType().equals(
+                ContainerType.APPLICATION_MASTER)) {
+          this.amrmProxyService.processApplicationStartRequest(request);
+        }
+
+        startContainerInternal(nmTokenIdentifier,
+            containerTokenIdentifier, request);
         succeededContainers.add(containerId);
       } catch (YarnException e) {
         failedContainers.put(containerId, SerializedException.newInstance(e));
@@ -751,7 +782,7 @@ public class ContainerManagerImpl extends CompositeService implements
     }
 
     return StartContainersResponse.newInstance(getAuxServiceMetaData(),
-      succeededContainers, failedContainers);
+        succeededContainers, failedContainers);
   }
 
   private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
@@ -844,7 +875,8 @@ public class ContainerManagerImpl extends CompositeService implements
       }
     }
 
-    Credentials credentials = parseCredentials(launchContext);
+    Credentials credentials =
+        YarnServerSecurityUtils.parseCredentials(launchContext);
 
     Container container =
         new ContainerImpl(getConfig(), this.dispatcher,
@@ -928,27 +960,6 @@ public class ContainerManagerImpl extends CompositeService implements
       nmTokenIdentifier);
   }
 
-  private Credentials parseCredentials(ContainerLaunchContext launchContext)
-      throws IOException {
-    Credentials credentials = new Credentials();
-    // //////////// Parse credentials
-    ByteBuffer tokens = launchContext.getTokens();
-
-    if (tokens != null) {
-      DataInputByteBuffer buf = new DataInputByteBuffer();
-      tokens.rewind();
-      buf.reset(tokens);
-      credentials.readTokenStorageStream(buf);
-      if (LOG.isDebugEnabled()) {
-        for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
-          LOG.debug(tk.getService() + " = " + tk.toString());
-        }
-      }
-    }
-    // //////////// End of parsing credentials
-    return credentials;
-  }
-
   /**
    * Stop a list of containers running on this NodeManager.
    */

+ 677 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

@@ -0,0 +1,677 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+/**
+ * Base class for all the AMRMProxyService test cases. It provides utility
+ * methods that can be used by the concrete test case classes
+ *
+ */
+public abstract class BaseAMRMProxyTest {
+  private static final Log LOG = LogFactory
+      .getLog(BaseAMRMProxyTest.class);
+  /**
+   * The AMRMProxyService instance that will be used by all the test cases
+   */
+  private MockAMRMProxyService amrmProxyService;
+  /**
+   * Thread pool used for asynchronous operations
+   */
+  private static ExecutorService threadpool = Executors
+      .newCachedThreadPool();
+  private Configuration conf;
+  private AsyncDispatcher dispatcher;
+
+  protected MockAMRMProxyService getAMRMProxyService() {
+    Assert.assertNotNull(this.amrmProxyService);
+    return this.amrmProxyService;
+  }
+
+  @Before
+  public void setUp() {
+    this.conf = new YarnConfiguration();
+    this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+    String mockPassThroughInterceptorClass =
+        PassThroughRequestInterceptor.class.getName();
+
+    // Create a request intercepter pipeline for testing. The last one in the
+    // chain will call the mock resource manager. The others in the chain will
+    // simply forward it to the next one in the chain
+    this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + ","
+            + mockPassThroughInterceptorClass + ","
+            + mockPassThroughInterceptorClass + ","
+            + MockRequestInterceptor.class.getName());
+
+    this.dispatcher = new AsyncDispatcher();
+    this.dispatcher.init(conf);
+    this.dispatcher.start();
+    this.amrmProxyService = createAndStartAMRMProxyService();
+  }
+
+  @After
+  public void tearDown() {
+    amrmProxyService.stop();
+    amrmProxyService = null;
+    this.dispatcher.stop();
+  }
+
+  protected ExecutorService getThreadPool() {
+    return threadpool;
+  }
+
+  protected MockAMRMProxyService createAndStartAMRMProxyService() {
+    MockAMRMProxyService svc =
+        new MockAMRMProxyService(new NullContext(), dispatcher);
+    svc.init(conf);
+    svc.start();
+    return svc;
+  }
+
+  /**
+   * This helper method will invoke the specified function in parallel for each
+   * end point in the specified list using a thread pool and return the
+   * responses received from the function. It implements the logic required for
+   * dispatching requests in parallel and waiting for the responses. If any of
+   * the function call fails or times out, it will ignore and proceed with the
+   * rest. So the responses returned can be less than the number of end points
+   * specified
+   * 
+   * @param testContext
+   * @param func
+   * @return
+   */
+  protected <T, R> List<R> runInParallel(List<T> testContexts,
+      final Function<T, R> func) {
+    ExecutorCompletionService<R> completionService =
+        new ExecutorCompletionService<R>(this.getThreadPool());
+    LOG.info("Sending requests to endpoints asynchronously. Number of test contexts="
+        + testContexts.size());
+    for (int index = 0; index < testContexts.size(); index++) {
+      final T testContext = testContexts.get(index);
+
+      LOG.info("Adding request to threadpool for test context: "
+          + testContext.toString());
+
+      completionService.submit(new Callable<R>() {
+        @Override
+        public R call() throws Exception {
+          LOG.info("Sending request. Test context:"
+              + testContext.toString());
+
+          R response = null;
+          try {
+            response = func.invoke(testContext);
+            LOG.info("Successfully sent request for context: "
+                + testContext.toString());
+          } catch (Throwable ex) {
+            LOG.error("Failed to process request for context: "
+                + testContext);
+            response = null;
+          }
+
+          return response;
+        }
+      });
+    }
+
+    ArrayList<R> responseList = new ArrayList<R>();
+    LOG.info("Waiting for responses from endpoints. Number of contexts="
+        + testContexts.size());
+    for (int i = 0; i < testContexts.size(); ++i) {
+      try {
+        final Future<R> future = completionService.take();
+        final R response = future.get(3000, TimeUnit.MILLISECONDS);
+        responseList.add(response);
+      } catch (Throwable e) {
+        LOG.error("Failed to process request " + e.getMessage());
+      }
+    }
+
+    return responseList;
+  }
+
+  /**
+   * Helper method to register an application master using specified testAppId
+   * as the application identifier and return the response
+   * 
+   * @param testAppId
+   * @return
+   * @throws Exception
+   * @throws YarnException
+   * @throws IOException
+   */
+  protected RegisterApplicationMasterResponse registerApplicationMaster(
+      final int testAppId) throws Exception, YarnException, IOException {
+    final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId);
+
+    return ugi
+        .getUser()
+        .doAs(
+            new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
+              @Override
+              public RegisterApplicationMasterResponse run()
+                  throws Exception {
+                getAMRMProxyService().initApp(
+                    ugi.getAppAttemptId(),
+                    ugi.getUser().getUserName());
+
+                final RegisterApplicationMasterRequest req =
+                    Records
+                        .newRecord(RegisterApplicationMasterRequest.class);
+                req.setHost(Integer.toString(testAppId));
+                req.setRpcPort(testAppId);
+                req.setTrackingUrl("");
+
+                RegisterApplicationMasterResponse response =
+                    getAMRMProxyService().registerApplicationMaster(req);
+                return response;
+              }
+            });
+  }
+
+  /**
+   * Helper method that can be used to register multiple application masters in
+   * parallel to the specified RM end points
+   * 
+   * @param testContexts - used to identify the requests
+   * @return
+   */
+  protected <T> List<RegisterApplicationMasterResponseInfo<T>> registerApplicationMastersInParallel(
+      final ArrayList<T> testContexts) {
+    List<RegisterApplicationMasterResponseInfo<T>> responses =
+        runInParallel(testContexts,
+            new Function<T, RegisterApplicationMasterResponseInfo<T>>() {
+              @Override
+              public RegisterApplicationMasterResponseInfo<T> invoke(
+                  T testContext) {
+                RegisterApplicationMasterResponseInfo<T> response = null;
+                try {
+                  int index = testContexts.indexOf(testContext);
+                  response =
+                      new RegisterApplicationMasterResponseInfo<T>(
+                          registerApplicationMaster(index), testContext);
+                  Assert.assertNotNull(response.getResponse());
+                  Assert.assertEquals(Integer.toString(index), response
+                      .getResponse().getQueue());
+
+                  LOG.info("Sucessfully registered application master with test context: "
+                      + testContext);
+                } catch (Throwable ex) {
+                  response = null;
+                  LOG.error("Failed to register application master with test context: "
+                      + testContext);
+                }
+
+                return response;
+              }
+            });
+
+    Assert.assertEquals(
+        "Number of responses received does not match with request",
+        testContexts.size(), responses.size());
+
+    Set<T> contextResponses = new TreeSet<T>();
+    for (RegisterApplicationMasterResponseInfo<T> item : responses) {
+      contextResponses.add(item.getTestContext());
+    }
+
+    for (T ep : testContexts) {
+      Assert.assertTrue(contextResponses.contains(ep));
+    }
+
+    return responses;
+  }
+
+  /**
+   * Unregisters the application master for specified application id
+   * 
+   * @param appId
+   * @param status
+   * @return
+   * @throws Exception
+   * @throws YarnException
+   * @throws IOException
+   */
+  protected FinishApplicationMasterResponse finishApplicationMaster(
+      final int appId, final FinalApplicationStatus status)
+      throws Exception, YarnException, IOException {
+
+    final ApplicationUserInfo ugi = getApplicationUserInfo(appId);
+
+    return ugi.getUser().doAs(
+        new PrivilegedExceptionAction<FinishApplicationMasterResponse>() {
+          @Override
+          public FinishApplicationMasterResponse run() throws Exception {
+            final FinishApplicationMasterRequest req =
+                Records.newRecord(FinishApplicationMasterRequest.class);
+            req.setDiagnostics("");
+            req.setTrackingUrl("");
+            req.setFinalApplicationStatus(status);
+
+            FinishApplicationMasterResponse response =
+                getAMRMProxyService().finishApplicationMaster(req);
+
+            getAMRMProxyService().stopApp(
+                ugi.getAppAttemptId().getApplicationId());
+
+            return response;
+          }
+        });
+  }
+
+  protected <T> List<FinishApplicationMasterResponseInfo<T>> finishApplicationMastersInParallel(
+      final ArrayList<T> testContexts) {
+    List<FinishApplicationMasterResponseInfo<T>> responses =
+        runInParallel(testContexts,
+            new Function<T, FinishApplicationMasterResponseInfo<T>>() {
+              @Override
+              public FinishApplicationMasterResponseInfo<T> invoke(
+                  T testContext) {
+                FinishApplicationMasterResponseInfo<T> response = null;
+                try {
+                  response =
+                      new FinishApplicationMasterResponseInfo<T>(
+                          finishApplicationMaster(
+                              testContexts.indexOf(testContext),
+                              FinalApplicationStatus.SUCCEEDED),
+                          testContext);
+                  Assert.assertNotNull(response.getResponse());
+
+                  LOG.info("Sucessfully finished application master with test contexts: "
+                      + testContext);
+                } catch (Throwable ex) {
+                  response = null;
+                  LOG.error("Failed to finish application master with test context: "
+                      + testContext);
+                }
+
+                return response;
+              }
+            });
+
+    Assert.assertEquals(
+        "Number of responses received does not match with request",
+        testContexts.size(), responses.size());
+
+    Set<T> contextResponses = new TreeSet<T>();
+    for (FinishApplicationMasterResponseInfo<T> item : responses) {
+      Assert.assertNotNull(item);
+      Assert.assertNotNull(item.getResponse());
+      contextResponses.add(item.getTestContext());
+    }
+
+    for (T ep : testContexts) {
+      Assert.assertTrue(contextResponses.contains(ep));
+    }
+
+    return responses;
+  }
+
+  protected AllocateResponse allocate(final int testAppId)
+      throws Exception, YarnException, IOException {
+    final AllocateRequest req = Records.newRecord(AllocateRequest.class);
+    req.setResponseId(testAppId);
+    return allocate(testAppId, req);
+  }
+
+  protected AllocateResponse allocate(final int testAppId,
+      final AllocateRequest request) throws Exception, YarnException,
+      IOException {
+
+    final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId);
+
+    return ugi.getUser().doAs(
+        new PrivilegedExceptionAction<AllocateResponse>() {
+          @Override
+          public AllocateResponse run() throws Exception {
+            AllocateResponse response =
+                getAMRMProxyService().allocate(request);
+            return response;
+          }
+        });
+  }
+
+  protected ApplicationUserInfo getApplicationUserInfo(final int testAppId) {
+    final ApplicationAttemptId attemptId =
+        getApplicationAttemptId(testAppId);
+
+    UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser(attemptId.toString());
+    AMRMTokenIdentifier token = new AMRMTokenIdentifier(attemptId, 1);
+    ugi.addTokenIdentifier(token);
+    return new ApplicationUserInfo(ugi, attemptId);
+  }
+
+  protected List<ResourceRequest> createResourceRequests(String[] hosts,
+      int memory, int vCores, int priority, int containers)
+      throws Exception {
+    return createResourceRequests(hosts, memory, vCores, priority,
+        containers, null);
+  }
+
+  protected List<ResourceRequest> createResourceRequests(String[] hosts,
+      int memory, int vCores, int priority, int containers,
+      String labelExpression) throws Exception {
+    List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
+    for (String host : hosts) {
+      ResourceRequest hostReq =
+          createResourceRequest(host, memory, vCores, priority,
+              containers, labelExpression);
+      reqs.add(hostReq);
+      ResourceRequest rackReq =
+          createResourceRequest("/default-rack", memory, vCores, priority,
+              containers, labelExpression);
+      reqs.add(rackReq);
+    }
+
+    ResourceRequest offRackReq =
+        createResourceRequest(ResourceRequest.ANY, memory, vCores,
+            priority, containers, labelExpression);
+    reqs.add(offRackReq);
+    return reqs;
+  }
+
+  protected ResourceRequest createResourceRequest(String resource,
+      int memory, int vCores, int priority, int containers)
+      throws Exception {
+    return createResourceRequest(resource, memory, vCores, priority,
+        containers, null);
+  }
+
+  protected ResourceRequest createResourceRequest(String resource,
+      int memory, int vCores, int priority, int containers,
+      String labelExpression) throws Exception {
+    ResourceRequest req = Records.newRecord(ResourceRequest.class);
+    req.setResourceName(resource);
+    req.setNumContainers(containers);
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(priority);
+    req.setPriority(pri);
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(memory);
+    capability.setVirtualCores(vCores);
+    req.setCapability(capability);
+    if (labelExpression != null) {
+      req.setNodeLabelExpression(labelExpression);
+    }
+    return req;
+  }
+
+  /**
+   * Returns an ApplicationId with the specified identifier
+   * 
+   * @param testAppId
+   * @return
+   */
+  protected ApplicationId getApplicationId(int testAppId) {
+    return ApplicationId.newInstance(123456, testAppId);
+  }
+
+  /**
+   * Return an instance of ApplicationAttemptId using specified identifier. This
+   * identifier will be used for the ApplicationId too.
+   * 
+   * @param testAppId
+   * @return
+   */
+  protected ApplicationAttemptId getApplicationAttemptId(int testAppId) {
+    return ApplicationAttemptId.newInstance(getApplicationId(testAppId),
+        testAppId);
+  }
+
+  /**
+   * Return an instance of ApplicationAttemptId using specified identifier and
+   * application id
+   * 
+   * @param testAppId
+   * @return
+   */
+  protected ApplicationAttemptId getApplicationAttemptId(int testAppId,
+      ApplicationId appId) {
+    return ApplicationAttemptId.newInstance(appId, testAppId);
+  }
+
+  protected static class RegisterApplicationMasterResponseInfo<T> {
+    private RegisterApplicationMasterResponse response;
+    private T testContext;
+
+    RegisterApplicationMasterResponseInfo(
+        RegisterApplicationMasterResponse response, T testContext) {
+      this.response = response;
+      this.testContext = testContext;
+    }
+
+    public RegisterApplicationMasterResponse getResponse() {
+      return response;
+    }
+
+    public T getTestContext() {
+      return testContext;
+    }
+  }
+
+  protected static class FinishApplicationMasterResponseInfo<T> {
+    private FinishApplicationMasterResponse response;
+    private T testContext;
+
+    FinishApplicationMasterResponseInfo(
+        FinishApplicationMasterResponse response, T testContext) {
+      this.response = response;
+      this.testContext = testContext;
+    }
+
+    public FinishApplicationMasterResponse getResponse() {
+      return response;
+    }
+
+    public T getTestContext() {
+      return testContext;
+    }
+  }
+
+  protected static class ApplicationUserInfo {
+    private UserGroupInformation user;
+    private ApplicationAttemptId attemptId;
+
+    ApplicationUserInfo(UserGroupInformation user,
+        ApplicationAttemptId attemptId) {
+      this.user = user;
+      this.attemptId = attemptId;
+    }
+
+    public UserGroupInformation getUser() {
+      return this.user;
+    }
+
+    public ApplicationAttemptId getAppAttemptId() {
+      return this.attemptId;
+    }
+  }
+
+  protected static class MockAMRMProxyService extends AMRMProxyService {
+    public MockAMRMProxyService(Context nmContext,
+        AsyncDispatcher dispatcher) {
+      super(nmContext, dispatcher);
+    }
+
+    /**
+     * This method is used by the test code to initialize the pipeline. In the
+     * actual service, the initialization is called by the
+     * ContainerManagerImpl::StartContainers method
+     * 
+     * @param applicationId
+     * @param user
+     */
+    public void initApp(ApplicationAttemptId applicationId, String user) {
+      super.initializePipeline(applicationId, user, null, null);
+    }
+
+    public void stopApp(ApplicationId applicationId) {
+      super.stopApplication(applicationId);
+    }
+  }
+
+  /**
+   * The Function interface is used for passing method pointers that can be
+   * invoked asynchronously at a later point.
+   */
+  protected interface Function<T, R> {
+    public R invoke(T input);
+  }
+
+  protected class NullContext implements Context {
+
+    @Override
+    public NodeId getNodeId() {
+      return null;
+    }
+
+    @Override
+    public int getHttpPort() {
+      return 0;
+    }
+
+    @Override
+    public ConcurrentMap<ApplicationId, Application> getApplications() {
+      return null;
+    }
+
+    @Override
+    public Map<ApplicationId, Credentials> getSystemCredentialsForApps() {
+      return null;
+    }
+
+    @Override
+    public ConcurrentMap<ContainerId, Container> getContainers() {
+      return null;
+    }
+
+    @Override
+    public NMContainerTokenSecretManager getContainerTokenSecretManager() {
+      return null;
+    }
+
+    @Override
+    public NMTokenSecretManagerInNM getNMTokenSecretManager() {
+      return null;
+    }
+
+    @Override
+    public NodeHealthStatus getNodeHealthStatus() {
+      return null;
+    }
+
+    @Override
+    public ContainerManagementProtocol getContainerManager() {
+      return null;
+    }
+
+    @Override
+    public LocalDirsHandlerService getLocalDirsHandler() {
+      return null;
+    }
+
+    @Override
+    public ApplicationACLsManager getApplicationACLsManager() {
+      return null;
+    }
+
+    @Override
+    public NMStateStoreService getNMStateStore() {
+      return null;
+    }
+
+    @Override
+    public boolean getDecommissioned() {
+      return false;
+    }
+
+    @Override
+    public void setDecommissioned(boolean isDecommissioned) {
+    }
+
+    @Override
+    public ConcurrentLinkedQueue<LogAggregationReport> getLogAggregationStatusForApps() {
+      return null;
+    }
+
+    @Override
+    public NodeResourceMonitor getNodeResourceMonitor() {
+      return null;
+    }
+
+  }
+}

+ 65 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class MockRequestInterceptor extends AbstractRequestInterceptor {
+
+  private MockResourceManagerFacade mockRM;
+
+  public MockRequestInterceptor() {
+  }
+
+  public void init(AMRMProxyApplicationContext appContext) {
+    super.init(appContext);
+    mockRM =
+        new MockResourceManagerFacade(new YarnConfiguration(
+            super.getConf()), 0);
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return mockRM.registerApplicationMaster(request);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return mockRM.finishApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnException, IOException {
+    return mockRM.allocate(request);
+  }
+}

+ 469 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java

@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.directory.api.util.Strings;
+import org.apache.directory.api.util.exception.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.mortbay.log.Log;
+
+/**
+ * Mock Resource Manager facade implementation that exposes all the methods
+ * implemented by the YARN RM. The behavior and the values returned by this mock
+ * implementation is expected by the unit test cases. So please change the
+ * implementation with care.
+ */
+public class MockResourceManagerFacade implements
+    ApplicationMasterProtocol, ApplicationClientProtocol {
+
+  private HashMap<String, List<ContainerId>> applicationContainerIdMap =
+      new HashMap<String, List<ContainerId>>();
+  private HashMap<ContainerId, Container> allocatedContainerMap =
+      new HashMap<ContainerId, Container>();
+  private AtomicInteger containerIndex = new AtomicInteger(0);
+  private Configuration conf;
+
+  public MockResourceManagerFacade(Configuration conf,
+      int startContainerIndex) {
+    this.conf = conf;
+    this.containerIndex.set(startContainerIndex);
+  }
+
+  private static String getAppIdentifier() throws IOException {
+    AMRMTokenIdentifier result = null;
+    UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
+    Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
+    for (TokenIdentifier tokenId : tokenIds) {
+      if (tokenId instanceof AMRMTokenIdentifier) {
+        result = (AMRMTokenIdentifier) tokenId;
+        break;
+      }
+    }
+    return result != null ? result.getApplicationAttemptId().toString()
+        : "";
+  }
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    String amrmToken = getAppIdentifier();
+    Log.info("Registering application attempt: " + amrmToken);
+
+    synchronized (applicationContainerIdMap) {
+      Assert.assertFalse("The application id is already registered: "
+          + amrmToken, applicationContainerIdMap.containsKey(amrmToken));
+      // Keep track of the containers that are returned to this application
+      applicationContainerIdMap.put(amrmToken,
+          new ArrayList<ContainerId>());
+    }
+
+    return RegisterApplicationMasterResponse.newInstance(null, null, null,
+        null, null, request.getHost(), null);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    String amrmToken = getAppIdentifier();
+    Log.info("Finishing application attempt: " + amrmToken);
+
+    synchronized (applicationContainerIdMap) {
+      // Remove the containers that were being tracked for this application
+      Assert.assertTrue("The application id is NOT registered: "
+          + amrmToken, applicationContainerIdMap.containsKey(amrmToken));
+      List<ContainerId> ids = applicationContainerIdMap.remove(amrmToken);
+      for (ContainerId c : ids) {
+        allocatedContainerMap.remove(c);
+      }
+    }
+
+    return FinishApplicationMasterResponse
+        .newInstance(request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED ? true
+            : false);
+  }
+
+  protected ApplicationId getApplicationId(int id) {
+    return ApplicationId.newInstance(12345, id);
+  }
+
+  protected ApplicationAttemptId getApplicationAttemptId(int id) {
+    return ApplicationAttemptId.newInstance(getApplicationId(id), 1);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnException, IOException {
+    if (request.getAskList() != null && request.getAskList().size() > 0
+        && request.getReleaseList() != null
+        && request.getReleaseList().size() > 0) {
+      Assert.fail("The mock RM implementation does not support receiving "
+          + "askList and releaseList in the same heartbeat");
+    }
+
+    String amrmToken = getAppIdentifier();
+
+    ArrayList<Container> containerList = new ArrayList<Container>();
+    if (request.getAskList() != null) {
+      for (ResourceRequest rr : request.getAskList()) {
+        for (int i = 0; i < rr.getNumContainers(); i++) {
+          ContainerId containerId =
+              ContainerId.newInstance(getApplicationAttemptId(1),
+                  containerIndex.incrementAndGet());
+          Container container = Records.newRecord(Container.class);
+          container.setId(containerId);
+          container.setPriority(rr.getPriority());
+
+          // We don't use the node for running containers in the test cases. So
+          // it is OK to hard code it to some dummy value
+          NodeId nodeId =
+              NodeId.newInstance(
+                  !Strings.isEmpty(rr.getResourceName()) ? rr
+                      .getResourceName() : "dummy", 1000);
+          container.setNodeId(nodeId);
+          container.setResource(rr.getCapability());
+          containerList.add(container);
+
+          synchronized (applicationContainerIdMap) {
+            // Keep track of the containers returned to this application. We
+            // will need it in future
+            Assert.assertTrue(
+                "The application id is Not registered before allocate(): "
+                    + amrmToken,
+                applicationContainerIdMap.containsKey(amrmToken));
+            List<ContainerId> ids =
+                applicationContainerIdMap.get(amrmToken);
+            ids.add(containerId);
+            this.allocatedContainerMap.put(containerId, container);
+          }
+        }
+      }
+    }
+
+    if (request.getReleaseList() != null
+        && request.getReleaseList().size() > 0) {
+      Log.info("Releasing containers: " + request.getReleaseList().size());
+      synchronized (applicationContainerIdMap) {
+        Assert.assertTrue(
+            "The application id is not registered before allocate(): "
+                + amrmToken,
+            applicationContainerIdMap.containsKey(amrmToken));
+        List<ContainerId> ids = applicationContainerIdMap.get(amrmToken);
+
+        for (ContainerId id : request.getReleaseList()) {
+          boolean found = false;
+          for (ContainerId c : ids) {
+            if (c.equals(id)) {
+              found = true;
+              break;
+            }
+          }
+
+          Assert.assertTrue(
+              "ContainerId " + id
+                  + " being released is not valid for application: "
+                  + conf.get("AMRMTOKEN"), found);
+
+          ids.remove(id);
+
+          // Return the released container back to the AM with new fake Ids. The
+          // test case does not care about the IDs. The IDs are faked because
+          // otherwise the LRM will throw duplication identifier exception. This
+          // returning of fake containers is ONLY done for testing purpose - for
+          // the test code to get confirmation that the sub-cluster resource
+          // managers received the release request
+          ContainerId fakeContainerId =
+              ContainerId.newInstance(getApplicationAttemptId(1),
+                  containerIndex.incrementAndGet());
+          Container fakeContainer = allocatedContainerMap.get(id);
+          fakeContainer.setId(fakeContainerId);
+          containerList.add(fakeContainer);
+        }
+      }
+    }
+
+    Log.info("Allocating containers: " + containerList.size()
+        + " for application attempt: " + conf.get("AMRMTOKEN"));
+    return AllocateResponse.newInstance(0,
+        new ArrayList<ContainerStatus>(), containerList,
+        new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
+        new ArrayList<NMToken>(),
+        new ArrayList<ContainerResourceIncrease>(),
+        new ArrayList<ContainerResourceDecrease>());
+  }
+
+  @Override
+  public GetApplicationReportResponse getApplicationReport(
+      GetApplicationReportRequest request) throws YarnException,
+      IOException {
+
+    GetApplicationReportResponse response =
+        Records.newRecord(GetApplicationReportResponse.class);
+    ApplicationReport report = Records.newRecord(ApplicationReport.class);
+    report.setYarnApplicationState(YarnApplicationState.ACCEPTED);
+    report.setApplicationId(request.getApplicationId());
+    report.setCurrentApplicationAttemptId(ApplicationAttemptId
+        .newInstance(request.getApplicationId(), 1));
+    response.setApplicationReport(report);
+    return response;
+  }
+
+  @Override
+  public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+      GetApplicationAttemptReportRequest request) throws YarnException,
+      IOException {
+    GetApplicationAttemptReportResponse response =
+        Records.newRecord(GetApplicationAttemptReportResponse.class);
+    ApplicationAttemptReport report =
+        Records.newRecord(ApplicationAttemptReport.class);
+    report.setApplicationAttemptId(request.getApplicationAttemptId());
+    report
+        .setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED);
+    response.setApplicationAttemptReport(report);
+    return response;
+  }
+
+  @Override
+  public GetNewApplicationResponse getNewApplication(
+      GetNewApplicationRequest request) throws YarnException, IOException {
+    return null;
+  }
+
+  @Override
+  public SubmitApplicationResponse submitApplication(
+      SubmitApplicationRequest request) throws YarnException, IOException {
+    return null;
+  }
+
+  @Override
+  public KillApplicationResponse forceKillApplication(
+      KillApplicationRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetClusterMetricsResponse getClusterMetrics(
+      GetClusterMetricsRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetApplicationsResponse getApplications(
+      GetApplicationsRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetClusterNodesResponse getClusterNodes(
+      GetClusterNodesRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetQueueUserAclsInfoResponse getQueueUserAcls(
+      GetQueueUserAclsInfoRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetDelegationTokenResponse getDelegationToken(
+      GetDelegationTokenRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RenewDelegationTokenResponse renewDelegationToken(
+      RenewDelegationTokenRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public CancelDelegationTokenResponse cancelDelegationToken(
+      CancelDelegationTokenRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+      MoveApplicationAcrossQueuesRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetApplicationAttemptsResponse getApplicationAttempts(
+      GetApplicationAttemptsRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetContainerReportResponse getContainerReport(
+      GetContainerReportRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetContainersResponse getContainers(GetContainersRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetNodesToLabelsResponse getNodeToLabels(
+      GetNodesToLabelsRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetClusterNodeLabelsResponse getClusterNodeLabels(
+      GetClusterNodeLabelsRequest request) throws YarnException,
+      IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetLabelsToNodesResponse getLabelsToNodes(
+      GetLabelsToNodesRequest request) throws YarnException, IOException {
+    return null;
+  }
+
+  @Override
+  public UpdateApplicationPriorityResponse updateApplicationPriority(
+      UpdateApplicationPriorityRequest request) throws YarnException,
+      IOException {
+    return null;
+  }
+}

+ 58 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Mock intercepter that does not do anything other than forwarding it to the
+ * next intercepter in the chain
+ *
+ */
+public class PassThroughRequestInterceptor extends
+    AbstractRequestInterceptor {
+
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return getNextInterceptor().registerApplicationMaster(request);
+  }
+
+  @Override
+  public FinishApplicationMasterResponse finishApplicationMaster(
+      FinishApplicationMasterRequest request) throws YarnException,
+      IOException {
+    return getNextInterceptor().finishApplicationMaster(request);
+  }
+
+  @Override
+  public AllocateResponse allocate(AllocateRequest request)
+      throws YarnException, IOException {
+    return getNextInterceptor().allocate(request);
+  }
+}

+ 484 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java

@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAMRMProxyService extends BaseAMRMProxyTest {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestAMRMProxyService.class);
+
+  /**
+   * Test if the pipeline is created properly.
+   */
+  @Test
+  public void testRequestInterceptorChainCreation() throws Exception {
+    RequestInterceptor root =
+        super.getAMRMProxyService().createRequestInterceptorChain();
+    int index = 0;
+    while (root != null) {
+      switch (index) {
+      case 0:
+      case 1:
+      case 2:
+        Assert.assertEquals(PassThroughRequestInterceptor.class.getName(),
+            root.getClass().getName());
+        break;
+      case 3:
+        Assert.assertEquals(MockRequestInterceptor.class.getName(), root
+            .getClass().getName());
+        break;
+      }
+
+      root = root.getNextInterceptor();
+      index++;
+    }
+
+    Assert.assertEquals(
+        "The number of interceptors in chain does not match",
+        Integer.toString(4), Integer.toString(index));
+
+  }
+
+  /**
+   * Tests registration of a single application master.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRegisterOneApplicationMaster() throws Exception {
+    // The testAppId identifier is used as host name and the mock resource
+    // manager return it as the queue name. Assert that we received the queue
+    // name
+    int testAppId = 1;
+    RegisterApplicationMasterResponse response1 =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(response1);
+    Assert.assertEquals(Integer.toString(testAppId), response1.getQueue());
+  }
+
+  /**
+   * Tests the registration of multiple application master serially one at a
+   * time.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRegisterMulitpleApplicationMasters() throws Exception {
+    for (int testAppId = 0; testAppId < 3; testAppId++) {
+      RegisterApplicationMasterResponse response =
+          registerApplicationMaster(testAppId);
+      Assert.assertNotNull(response);
+      Assert
+          .assertEquals(Integer.toString(testAppId), response.getQueue());
+    }
+  }
+
+  /**
+   * Tests the registration of multiple application masters using multiple
+   * threads in parallel.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testRegisterMulitpleApplicationMastersInParallel()
+      throws Exception {
+    int numberOfRequests = 5;
+    ArrayList<String> testContexts =
+        CreateTestRequestIdentifiers(numberOfRequests);
+    super.registerApplicationMastersInParallel(testContexts);
+  }
+
+  private ArrayList<String> CreateTestRequestIdentifiers(
+      int numberOfRequests) {
+    ArrayList<String> testContexts = new ArrayList<String>();
+    LOG.info("Creating " + numberOfRequests + " contexts for testing");
+    for (int ep = 0; ep < numberOfRequests; ep++) {
+      testContexts.add("test-endpoint-" + Integer.toString(ep));
+      LOG.info("Created test context: " + testContexts.get(ep));
+    }
+    return testContexts;
+  }
+
+  @Test
+  public void testFinishOneApplicationMasterWithSuccess() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    Assert.assertEquals(Integer.toString(testAppId),
+        registerResponse.getQueue());
+
+    FinishApplicationMasterResponse finshResponse =
+        finishApplicationMaster(testAppId,
+            FinalApplicationStatus.SUCCEEDED);
+
+    Assert.assertNotNull(finshResponse);
+    Assert.assertEquals(true, finshResponse.getIsUnregistered());
+  }
+
+  @Test
+  public void testFinishOneApplicationMasterWithFailure() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    Assert.assertEquals(Integer.toString(testAppId),
+        registerResponse.getQueue());
+
+    FinishApplicationMasterResponse finshResponse =
+        finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED);
+
+    Assert.assertNotNull(finshResponse);
+    Assert.assertEquals(false, finshResponse.getIsUnregistered());
+
+    try {
+      // Try to finish an application master that is already finished.
+      finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+      Assert
+          .fail("The request to finish application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("Finish registration failed as expected because it was not registered");
+    }
+  }
+
+  @Test
+  public void testFinishInvalidApplicationMaster() throws Exception {
+    try {
+      // Try to finish an application master that was not registered.
+      finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
+      Assert
+          .fail("The request to finish application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("Finish registration failed as expected because it was not registered");
+    }
+  }
+
+  @Test
+  public void testFinishMulitpleApplicationMasters() throws Exception {
+    int numberOfRequests = 3;
+    for (int index = 0; index < numberOfRequests; index++) {
+      RegisterApplicationMasterResponse registerResponse =
+          registerApplicationMaster(index);
+      Assert.assertNotNull(registerResponse);
+      Assert.assertEquals(Integer.toString(index),
+          registerResponse.getQueue());
+    }
+
+    // Finish in reverse sequence
+    for (int index = numberOfRequests - 1; index >= 0; index--) {
+      FinishApplicationMasterResponse finshResponse =
+          finishApplicationMaster(index, FinalApplicationStatus.SUCCEEDED);
+
+      Assert.assertNotNull(finshResponse);
+      Assert.assertEquals(true, finshResponse.getIsUnregistered());
+
+      // Assert that the application has been removed from the collection
+      Assert.assertTrue(this.getAMRMProxyService()
+          .getPipelines().size() == index);
+    }
+
+    try {
+      // Try to finish an application master that is already finished.
+      finishApplicationMaster(1, FinalApplicationStatus.SUCCEEDED);
+      Assert
+          .fail("The request to finish application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("Finish registration failed as expected because it was not registered");
+    }
+
+    try {
+      // Try to finish an application master that was not registered.
+      finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
+      Assert
+          .fail("The request to finish application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("Finish registration failed as expected because it was not registered");
+    }
+  }
+
+  @Test
+  public void testFinishMulitpleApplicationMastersInParallel()
+      throws Exception {
+    int numberOfRequests = 5;
+    ArrayList<String> testContexts = new ArrayList<String>();
+    LOG.info("Creating " + numberOfRequests + " contexts for testing");
+    for (int i = 0; i < numberOfRequests; i++) {
+      testContexts.add("test-endpoint-" + Integer.toString(i));
+      LOG.info("Created test context: " + testContexts.get(i));
+
+      RegisterApplicationMasterResponse registerResponse =
+          registerApplicationMaster(i);
+      Assert.assertNotNull(registerResponse);
+      Assert
+          .assertEquals(Integer.toString(i), registerResponse.getQueue());
+    }
+
+    finishApplicationMastersInParallel(testContexts);
+  }
+
+  @Test
+  public void testAllocateRequestWithNullValues() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    Assert.assertEquals(Integer.toString(testAppId),
+        registerResponse.getQueue());
+
+    AllocateResponse allocateResponse = allocate(testAppId);
+    Assert.assertNotNull(allocateResponse);
+
+    FinishApplicationMasterResponse finshResponse =
+        finishApplicationMaster(testAppId,
+            FinalApplicationStatus.SUCCEEDED);
+
+    Assert.assertNotNull(finshResponse);
+    Assert.assertEquals(true, finshResponse.getIsUnregistered());
+  }
+
+  @Test
+  public void testAllocateRequestWithoutRegistering() throws Exception {
+
+    try {
+      // Try to allocate an application master without registering.
+      allocate(1);
+      Assert
+          .fail("The request to allocate application master should have failed");
+    } catch (Throwable ex) {
+      // This is expected. So nothing required here.
+      LOG.info("AllocateRequest failed as expected because AM was not registered");
+    }
+  }
+
+  @Test
+  public void testAllocateWithOneResourceRequest() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    getContainersAndAssert(testAppId, 1);
+    finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+  }
+
+  @Test
+  public void testAllocateWithMultipleResourceRequest() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    getContainersAndAssert(testAppId, 10);
+    finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+  }
+
+  @Test
+  public void testAllocateAndReleaseContainers() throws Exception {
+    int testAppId = 1;
+    RegisterApplicationMasterResponse registerResponse =
+        registerApplicationMaster(testAppId);
+    Assert.assertNotNull(registerResponse);
+    List<Container> containers = getContainersAndAssert(testAppId, 10);
+    releaseContainersAndAssert(testAppId, containers);
+    finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+  }
+
+  @Test
+  public void testAllocateAndReleaseContainersForMultipleAM()
+      throws Exception {
+    int numberOfApps = 5;
+    for (int testAppId = 0; testAppId < numberOfApps; testAppId++) {
+      RegisterApplicationMasterResponse registerResponse =
+          registerApplicationMaster(testAppId);
+      Assert.assertNotNull(registerResponse);
+      List<Container> containers = getContainersAndAssert(testAppId, 10);
+      releaseContainersAndAssert(testAppId, containers);
+    }
+    for (int testAppId = 0; testAppId < numberOfApps; testAppId++) {
+      finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+    }
+  }
+
+  @Test
+  public void testAllocateAndReleaseContainersForMultipleAMInParallel()
+      throws Exception {
+    int numberOfApps = 6;
+    ArrayList<Integer> tempAppIds = new ArrayList<Integer>();
+    for (int i = 0; i < numberOfApps; i++) {
+      tempAppIds.add(new Integer(i));
+    }
+
+    final ArrayList<Integer> appIds = tempAppIds;
+    List<Integer> responses =
+        runInParallel(appIds, new Function<Integer, Integer>() {
+          @Override
+          public Integer invoke(Integer testAppId) {
+            try {
+              RegisterApplicationMasterResponse registerResponse =
+                  registerApplicationMaster(testAppId);
+              Assert.assertNotNull("response is null", registerResponse);
+              List<Container> containers =
+                  getContainersAndAssert(testAppId, 10);
+              releaseContainersAndAssert(testAppId, containers);
+
+              LOG.info("Sucessfully registered application master with appId: "
+                  + testAppId);
+            } catch (Throwable ex) {
+              LOG.error(
+                  "Failed to register application master with appId: "
+                      + testAppId, ex);
+              testAppId = null;
+            }
+
+            return testAppId;
+          }
+        });
+
+    Assert.assertEquals(
+        "Number of responses received does not match with request",
+        appIds.size(), responses.size());
+
+    for (Integer testAppId : responses) {
+      Assert.assertNotNull(testAppId);
+      finishApplicationMaster(testAppId.intValue(),
+          FinalApplicationStatus.SUCCEEDED);
+    }
+  }
+
+  private List<Container> getContainersAndAssert(int appId,
+      int numberOfResourceRequests) throws Exception {
+    AllocateRequest allocateRequest =
+        Records.newRecord(AllocateRequest.class);
+    allocateRequest.setResponseId(1);
+
+    List<Container> containers =
+        new ArrayList<Container>(numberOfResourceRequests);
+    List<ResourceRequest> askList =
+        new ArrayList<ResourceRequest>(numberOfResourceRequests);
+    for (int testAppId = 0; testAppId < numberOfResourceRequests; testAppId++) {
+      askList.add(createResourceRequest(
+          "test-node-" + Integer.toString(testAppId), 6000, 2,
+          testAppId % 5, 1));
+    }
+
+    allocateRequest.setAskList(askList);
+
+    AllocateResponse allocateResponse = allocate(appId, allocateRequest);
+    Assert.assertNotNull("allocate() returned null response",
+        allocateResponse);
+
+    containers.addAll(allocateResponse.getAllocatedContainers());
+
+    // Send max 10 heart beats to receive all the containers. If not, we will
+    // fail the test
+    int numHeartbeat = 0;
+    while (containers.size() < askList.size() && numHeartbeat++ < 10) {
+      allocateResponse =
+          allocate(appId, Records.newRecord(AllocateRequest.class));
+      Assert.assertNotNull("allocate() returned null response",
+          allocateResponse);
+
+      containers.addAll(allocateResponse.getAllocatedContainers());
+
+      LOG.info("Number of allocated containers in this request: "
+          + Integer.toString(allocateResponse.getAllocatedContainers()
+              .size()));
+      LOG.info("Total number of allocated containers: "
+          + Integer.toString(containers.size()));
+      Thread.sleep(10);
+    }
+
+    // We broadcast the request, the number of containers we received will be
+    // higher than we ask
+    Assert.assertTrue("The asklist count is not same as response",
+        askList.size() <= containers.size());
+    return containers;
+  }
+
+  private void releaseContainersAndAssert(int appId,
+      List<Container> containers) throws Exception {
+    Assert.assertTrue(containers.size() > 0);
+    AllocateRequest allocateRequest =
+        Records.newRecord(AllocateRequest.class);
+    allocateRequest.setResponseId(1);
+
+    List<ContainerId> relList =
+        new ArrayList<ContainerId>(containers.size());
+    for (Container container : containers) {
+      relList.add(container.getId());
+    }
+
+    allocateRequest.setReleaseList(relList);
+
+    AllocateResponse allocateResponse = allocate(appId, allocateRequest);
+    Assert.assertNotNull(allocateResponse);
+
+    // The way the mock resource manager is setup, it will return the containers
+    // that were released in the response. This is done because the UAMs run
+    // asynchronously and we need to if all the resource managers received the
+    // release it. The containers sent by the mock resource managers will be
+    // aggregated and returned back to us and we can assert if all the release
+    // lists reached the sub-clusters
+    List<Container> containersForReleasedContainerIds =
+        new ArrayList<Container>();
+    containersForReleasedContainerIds.addAll(allocateResponse
+        .getAllocatedContainers());
+
+    // Send max 10 heart beats to receive all the containers. If not, we will
+    // fail the test
+    int numHeartbeat = 0;
+    while (containersForReleasedContainerIds.size() < relList.size()
+        && numHeartbeat++ < 10) {
+      allocateResponse =
+          allocate(appId, Records.newRecord(AllocateRequest.class));
+      Assert.assertNotNull(allocateResponse);
+      containersForReleasedContainerIds.addAll(allocateResponse
+          .getAllocatedContainers());
+
+      LOG.info("Number of containers received in this request: "
+          + Integer.toString(allocateResponse.getAllocatedContainers()
+              .size()));
+      LOG.info("Total number of containers received: "
+          + Integer.toString(containersForReleasedContainerIds.size()));
+      Thread.sleep(10);
+    }
+
+    Assert.assertEquals(relList.size(),
+        containersForReleasedContainerIds.size());
+  }
+}

+ 6 - 63
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -40,9 +40,7 @@ import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -76,7 +74,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@@ -95,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -175,69 +173,13 @@ public class ApplicationMasterService extends AbstractService implements
     return this.masterServiceAddress;
   }
 
-  // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
-  // currently sets only the required id, but iterate through anyways just to be
-  // sure.
-  private AMRMTokenIdentifier selectAMRMTokenIdentifier(
-      UserGroupInformation remoteUgi) throws IOException {
-    AMRMTokenIdentifier result = null;
-    Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
-    for (TokenIdentifier tokenId : tokenIds) {
-      if (tokenId instanceof AMRMTokenIdentifier) {
-        result = (AMRMTokenIdentifier) tokenId;
-        break;
-      }
-    }
-
-    return result;
-  }
-
-  private AMRMTokenIdentifier authorizeRequest()
-      throws YarnException {
-
-    UserGroupInformation remoteUgi;
-    try {
-      remoteUgi = UserGroupInformation.getCurrentUser();
-    } catch (IOException e) {
-      String msg =
-          "Cannot obtain the user-name for authorizing ApplicationMaster. "
-              + "Got exception: " + StringUtils.stringifyException(e);
-      LOG.warn(msg);
-      throw RPCUtil.getRemoteException(msg);
-    }
-
-    boolean tokenFound = false;
-    String message = "";
-    AMRMTokenIdentifier appTokenIdentifier = null;
-    try {
-      appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
-      if (appTokenIdentifier == null) {
-        tokenFound = false;
-        message = "No AMRMToken found for user " + remoteUgi.getUserName();
-      } else {
-        tokenFound = true;
-      }
-    } catch (IOException e) {
-      tokenFound = false;
-      message =
-          "Got exception while looking for AMRMToken for user "
-              + remoteUgi.getUserName();
-    }
-
-    if (!tokenFound) {
-      LOG.warn(message);
-      throw RPCUtil.getRemoteException(message);
-    }
-
-    return appTokenIdentifier;
-  }
-
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
       RegisterApplicationMasterRequest request) throws YarnException,
       IOException {
 
-    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+    AMRMTokenIdentifier amrmTokenIdentifier =
+        YarnServerSecurityUtils.authorizeRequest();
     ApplicationAttemptId applicationAttemptId =
         amrmTokenIdentifier.getApplicationAttemptId();
 
@@ -346,7 +288,7 @@ public class ApplicationMasterService extends AbstractService implements
       IOException {
 
     ApplicationAttemptId applicationAttemptId =
-        authorizeRequest().getApplicationAttemptId();
+        YarnServerSecurityUtils.authorizeRequest().getApplicationAttemptId();
     ApplicationId appId = applicationAttemptId.getApplicationId();
 
     RMApp rmApp =
@@ -430,7 +372,8 @@ public class ApplicationMasterService extends AbstractService implements
   public AllocateResponse allocate(AllocateRequest request)
       throws YarnException, IOException {
 
-    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+    AMRMTokenIdentifier amrmTokenIdentifier =
+        YarnServerSecurityUtils.authorizeRequest();
 
     ApplicationAttemptId appAttemptId =
         amrmTokenIdentifier.getApplicationAttemptId();