瀏覽代碼

YARN-103. Add a yarn AM-RM client module. Contributed by Bikas Saha.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1428554 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 12 年之前
父節點
當前提交
0a61990855

+ 2 - 0
hadoop-yarn-project/CHANGES.txt

@@ -28,6 +28,8 @@ Release 2.0.3-alpha - Unreleased
     YARN-230. RM Restart phase 1 - includes support for saving/restarting all
     applications on an RM bounce. (Bikas Saha via acmurthy)
 
+    YARN-103. Add a yarn AM-RM client module. (Bikas Saha via sseth)
+
   IMPROVEMENTS
 
     YARN-223. Update process tree instead of getting new process trees.

+ 7 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml

@@ -36,7 +36,13 @@
   	<dependency>
   		<groupId>org.apache.hadoop</groupId>
   		<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
-		<scope>test</scope>
+      <scope>test</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>org.apache.hadoop</groupId>
+  		<artifactId>hadoop-yarn-server-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
   	</dependency>
       <dependency>
   		<groupId>org.apache.hadoop</groupId>

+ 153 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java

@@ -0,0 +1,153 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.Service;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface AMRMClient extends Service {
+
+  /**
+   * Value used to define no locality
+   */
+  static final String ANY = "*";
+
+  /**
+   * Object to represent container request for resources.
+   * Resources may be localized to nodes and racks.
+   * Resources may be assigned priorities.
+   * Can ask for multiple containers of a given type.
+   */
+  public static class ContainerRequest {
+    Resource capability;
+    String[] hosts;
+    String[] racks;
+    Priority priority;
+    int containerCount;
+        
+    public ContainerRequest(Resource capability, String[] hosts,
+        String[] racks, Priority priority, int containerCount) {
+      this.capability = capability;
+      this.hosts = (hosts != null ? hosts.clone() : null);
+      this.racks = (racks != null ? racks.clone() : null);
+      this.priority = priority;
+      this.containerCount = containerCount;
+    }
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Capability[").append(capability).append("]");
+      sb.append("Priority[").append(priority).append("]");
+      sb.append("ContainerCount[").append(containerCount).append("]");
+      return sb.toString();
+    }
+  }
+  
+  /**
+   * Register the application master. This must be called before any 
+   * other interaction
+   * @param appHostName Name of the host on which master is running
+   * @param appHostPort Port master is listening on
+   * @param appTrackingUrl URL at which the master info can be seen
+   * @return <code>RegisterApplicationMasterResponse</code>
+   * @throws YarnRemoteException
+   */
+  public RegisterApplicationMasterResponse 
+               registerApplicationMaster(String appHostName,
+                                         int appHostPort,
+                                         String appTrackingUrl) 
+               throws YarnRemoteException;
+  
+  /**
+   * Request additional containers and receive new container allocations.
+   * Requests made via <code>addContainerRequest</code> are sent to the 
+   * <code>ResourceManager</code>. New containers assigned to the master are 
+   * retrieved. Status of completed containers and node health updates are 
+   * also retrieved.
+   * This also doubles up as a heartbeat to the ResourceManager and must be 
+   * made periodically.
+   * The call may not always return any new allocations of containers.
+   * App should not make concurrent allocate requests. May cause request loss.
+   * @param progressIndicator Indicates progress made by the master
+   * @return the response of the allocate request
+   * @throws YarnRemoteException
+   */
+  public AllocateResponse allocate(float progressIndicator) 
+                           throws YarnRemoteException;
+  
+  /**
+   * Unregister the application master. This must be called in the end.
+   * @param appStatus Success/Failure status of the master
+   * @param appMessage Diagnostics message on failure
+   * @param appTrackingUrl New URL to get master info
+   * @throws YarnRemoteException
+   */
+  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+                                           String appMessage,
+                                           String appTrackingUrl) 
+               throws YarnRemoteException;
+  
+  /**
+   * Request containers for resources before calling <code>allocate</code>
+   * @param req Resource request
+   */
+  public void addContainerRequest(ContainerRequest req);
+  
+  /**
+   * Remove previous container request. The previous container request may have 
+   * already been sent to the ResourceManager. So even after the remove request 
+   * the app must be prepared to receive an allocation for the previous request 
+   * even after the remove request
+   * @param req Resource request
+   */
+  public void removeContainerRequest(ContainerRequest req);
+  
+  /**
+   * Release containers assigned by the Resource Manager. If the app cannot use
+   * the container or wants to give up the container then it can release them.
+   * The app needs to make new requests for the released resource capability if
+   * it still needs it. eg. it released non-local resources
+   * @param containerId
+   */
+  public void releaseAssignedContainer(ContainerId containerId);
+  
+  /**
+   * Get the currently available resources in the cluster.
+   * A valid value is available after a call to allocate has been made
+   * @return Currently available resources
+   */
+  public Resource getClusterAvailableResources();
+  
+  /**
+   * Get the current number of nodes in the cluster.
+   * A valid values is available after a call to allocate has been made
+   * @return Current number of nodes in the cluster
+   */
+  public int getClusterNodeCount();
+}

+ 410 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java

@@ -0,0 +1,410 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+@Unstable
+public class AMRMClientImpl extends AbstractService implements AMRMClient {
+
+  private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
+  
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+  
+  private int lastResponseId = 0;
+
+  protected AMRMProtocol rmClient;
+  protected final ApplicationAttemptId appAttemptId;  
+  protected Resource clusterAvailableResources;
+  protected int clusterNodeCount;
+  
+  //Key -> Priority
+  //Value -> Map
+  //Key->ResourceName (e.g., hostname, rackname, *)
+  //Value->Map
+  //Key->Resource Capability
+  //Value->ResourceRequest
+  protected final 
+  Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+    remoteRequestsTable =
+    new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
+
+  protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
+      new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
+  protected final Set<ContainerId> release = new TreeSet<ContainerId>();
+  
+  public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
+    super(AMRMClientImpl.class.getName());
+    this.appAttemptId = appAttemptId;
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    super.init(conf);
+  }
+
+  @Override
+  public synchronized void start() {
+    final YarnConfiguration conf = new YarnConfiguration(getConfig());
+    final YarnRPC rpc = YarnRPC.create(conf);
+    final InetSocketAddress rmAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+    UserGroupInformation currentUser;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      String tokenURLEncodedStr = System.getenv().get(
+          ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+
+      try {
+        token.decodeFromUrlString(tokenURLEncodedStr);
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+
+      SecurityUtil.setTokenService(token, rmAddress);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AppMasterToken is " + token);
+      }
+      currentUser.addToken(token);
+    }
+
+    rmClient = currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+      @Override
+      public AMRMProtocol run() {
+        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress,
+            conf);
+      }
+    });
+    LOG.debug("Connecting to ResourceManager at " + rmAddress);
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    RPC.stopProxy(this.rmClient);
+    super.stop();
+  }
+  
+  @Override
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      String appHostName, int appHostPort, String appTrackingUrl)
+      throws YarnRemoteException {
+    // do this only once ???
+    RegisterApplicationMasterRequest request = recordFactory
+        .newRecordInstance(RegisterApplicationMasterRequest.class);
+    synchronized (this) {
+      request.setApplicationAttemptId(appAttemptId);      
+    }
+    request.setHost(appHostName);
+    request.setRpcPort(appHostPort);
+    if(appTrackingUrl != null) {
+      request.setTrackingUrl(appTrackingUrl);
+    }
+    RegisterApplicationMasterResponse response = rmClient
+        .registerApplicationMaster(request);
+    return response;
+  }
+
+  @Override
+  public AllocateResponse allocate(float progressIndicator) 
+      throws YarnRemoteException {
+    AllocateResponse allocateResponse = null;
+    ArrayList<ResourceRequest> askList = null;
+    ArrayList<ContainerId> releaseList = null;
+    AllocateRequest allocateRequest = null;
+    
+    try {
+      synchronized (this) {
+        askList = new ArrayList<ResourceRequest>(ask);
+        releaseList = new ArrayList<ContainerId>(release);
+        // optimistically clear this collection assuming no RPC failure
+        ask.clear();
+        release.clear();
+        allocateRequest = BuilderUtils
+            .newAllocateRequest(appAttemptId, lastResponseId, progressIndicator,
+                askList, releaseList);
+      }
+
+      allocateResponse = rmClient.allocate(allocateRequest);
+      AMResponse response = allocateResponse.getAMResponse();
+
+      synchronized (this) {
+        // update these on successful RPC
+        clusterNodeCount = allocateResponse.getNumClusterNodes();
+        lastResponseId = response.getResponseId();
+        clusterAvailableResources = response.getAvailableResources();
+      }
+    } finally {
+      // TODO how to differentiate remote yarn exception vs error in rpc
+      if(allocateResponse == null) {
+        // we hit an exception in allocate()
+        // preserve ask and release for next call to allocate()
+        synchronized (this) {
+          release.addAll(releaseList);
+          // requests could have been added or deleted during call to allocate
+          // If requests were added/removed then there is nothing to do since
+          // the ResourceRequest object in ask would have the actual new value.
+          // If ask does not have this ResourceRequest then it was unchanged and
+          // so we can add the value back safely.
+          // This assumes that there will no concurrent calls to allocate() and
+          // so we dont have to worry about ask being changed in the
+          // synchronized block at the beginning of this method.
+          for(ResourceRequest oldAsk : askList) {
+            if(!ask.contains(oldAsk)) {
+              ask.add(oldAsk);
+            }
+          }
+        }
+      }
+    }
+    return allocateResponse;
+  }
+
+  @Override
+  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+      String appMessage, String appTrackingUrl) throws YarnRemoteException {
+    FinishApplicationMasterRequest request = recordFactory
+                  .newRecordInstance(FinishApplicationMasterRequest.class);
+    request.setAppAttemptId(appAttemptId);
+    request.setFinishApplicationStatus(appStatus);
+    if(appMessage != null) {
+      request.setDiagnostics(appMessage);
+    }
+    if(appTrackingUrl != null) {
+      request.setTrackingUrl(appTrackingUrl);
+    }
+    rmClient.finishApplicationMaster(request);
+  }
+  
+  @Override
+  public synchronized void addContainerRequest(ContainerRequest req) {
+    // Create resource requests
+    if(req.hosts != null) {
+      for (String host : req.hosts) {
+        addResourceRequest(req.priority, host, req.capability, req.containerCount);
+      }
+    }
+
+    if(req.racks != null) {
+      for (String rack : req.racks) {
+        addResourceRequest(req.priority, rack, req.capability, req.containerCount);
+      }
+    }
+
+    // Off-switch
+    addResourceRequest(req.priority, ANY, req.capability, req.containerCount); 
+  }
+
+  @Override
+  public synchronized void removeContainerRequest(ContainerRequest req) {
+    // Update resource requests
+    if(req.hosts != null) {
+      for (String hostName : req.hosts) {
+        decResourceRequest(req.priority, hostName, req.capability, req.containerCount);
+      }
+    }
+    
+    if(req.racks != null) {
+      for (String rack : req.racks) {
+        decResourceRequest(req.priority, rack, req.capability, req.containerCount);
+      }
+    }
+   
+    decResourceRequest(req.priority, ANY, req.capability, req.containerCount);
+  }
+
+  @Override
+  public synchronized void releaseAssignedContainer(ContainerId containerId) {
+    release.add(containerId);
+  }
+  
+  @Override
+  public synchronized Resource getClusterAvailableResources() {
+    return clusterAvailableResources;
+  }
+  
+  @Override
+  public synchronized int getClusterNodeCount() {
+    return clusterNodeCount;
+  }
+  
+  private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
+    // This code looks weird but is needed because of the following scenario.
+    // A ResourceRequest is removed from the remoteRequestTable. A 0 container 
+    // request is added to 'ask' to notify the RM about not needing it any more.
+    // Before the call to allocate, the user now requests more containers. If 
+    // the locations of the 0 size request and the new request are the same
+    // (with the difference being only container count), then the set comparator
+    // will consider both to be the same and not add the new request to ask. So 
+    // we need to check for the "same" request being present and remove it and 
+    // then add it back. The comparator is container count agnostic.
+    // This should happen only rarely but we do need to guard against it.
+    if(ask.contains(remoteRequest)) {
+      ask.remove(remoteRequest);
+    }
+    ask.add(remoteRequest);
+  }
+
+  private void addResourceRequest(Priority priority, String resourceName,
+      Resource capability, int containerCount) {
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+    if (remoteRequests == null) {
+      remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
+      this.remoteRequestsTable.put(priority, remoteRequests);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added priority=" + priority);
+      }
+    }
+    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      reqMap = new HashMap<Resource, ResourceRequest>();
+      remoteRequests.put(resourceName, reqMap);
+    }
+    ResourceRequest remoteRequest = reqMap.get(capability);
+    if (remoteRequest == null) {
+      remoteRequest = BuilderUtils.
+          newResourceRequest(priority, resourceName, capability, 0);
+      reqMap.put(capability, remoteRequest);
+    }
+    
+    remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount);
+
+    // Note this down for next interaction with ResourceManager
+    addResourceRequestToAsk(remoteRequest);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("addResourceRequest:" + " applicationId="
+          + appAttemptId + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
+  }
+
+  private void decResourceRequest(Priority priority, String resourceName,
+      Resource capability, int containerCount) {
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+      this.remoteRequestsTable.get(priority);
+    
+    if(remoteRequests == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as priority " + priority 
+            + " is not present in request table");
+      }
+      return;
+    }
+    
+    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as " + resourceName
+            + " is not present in request table");
+      }
+      return;
+    }
+    ResourceRequest remoteRequest = reqMap.get(capability);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+          + appAttemptId + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
+
+    remoteRequest.
+        setNumContainers(remoteRequest.getNumContainers() - containerCount);
+    if(remoteRequest.getNumContainers() < 0) {
+      // guard against spurious removals
+      remoteRequest.setNumContainers(0);
+    }
+    // send the ResourceRequest to RM even if is 0 because it needs to override
+    // a previously sent value. If ResourceRequest was not sent previously then
+    // sending 0 aught to be a no-op on RM
+    addResourceRequestToAsk(remoteRequest);
+
+    // delete entries from map if no longer needed
+    if (remoteRequest.getNumContainers() == 0) {
+      reqMap.remove(capability);
+      if (reqMap.size() == 0) {
+        remoteRequests.remove(resourceName);
+      }
+      if (remoteRequests.size() == 0) {
+        remoteRequestsTable.remove(priority);
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.info("AFTER decResourceRequest:" + " applicationId="
+          + appAttemptId + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+    }
+  }
+
+}

+ 297 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java

@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.util.Records;
+
+public class TestAMRMClient {
+  Configuration conf = null;
+  MiniYARNCluster yarnCluster = null;
+  YarnClientImpl yarnClient = null;
+  List<NodeReport> nodeReports = null;
+  ApplicationAttemptId attemptId = null;
+  int nodeCount = 3;
+  
+  @Before
+  public void setup() throws YarnRemoteException {
+    // start minicluster
+    conf = new YarnConfiguration();
+    yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    yarnCluster.init(conf);
+    yarnCluster.start();
+
+    // start rm client
+    yarnClient = new YarnClientImpl();
+    yarnClient.init(conf);
+    yarnClient.start();
+
+    // get node info
+    nodeReports = yarnClient.getNodeReports();
+
+    // submit new app
+    GetNewApplicationResponse newApp = yarnClient.getNewApplication();
+    ApplicationId appId = newApp.getApplicationId();
+
+    ApplicationSubmissionContext appContext = Records
+        .newRecord(ApplicationSubmissionContext.class);
+    // set the application id
+    appContext.setApplicationId(appId);
+    // set the application name
+    appContext.setApplicationName("Test");
+    // Set the priority for the application master
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(0);
+    appContext.setPriority(pri);
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue("default");
+    // Set up the container launch context for the application master
+    ContainerLaunchContext amContainer = Records
+        .newRecord(ContainerLaunchContext.class);
+    appContext.setAMContainerSpec(amContainer);
+    // unmanaged AM
+    appContext.setUnmanagedAM(true);
+    // Create the request to send to the applications manager
+    SubmitApplicationRequest appRequest = Records
+        .newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+    // Submit the application to the applications manager
+    yarnClient.submitApplication(appContext);
+
+    // wait for app to start
+    while (true) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+        attemptId = appReport.getCurrentApplicationAttemptId();
+        break;
+      }
+    }
+  }
+  
+  @After
+  public void tearDown() {
+    if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
+      yarnClient.stop();
+    }
+    if (yarnCluster != null && yarnCluster.getServiceState() == STATE.STARTED) {
+      yarnCluster.stop();
+    }
+  }
+
+  @Test (timeout=60000)
+  public void testAMRMClient() throws YarnRemoteException {
+    AMRMClientImpl amClient = null;
+    try {
+      // start am rm client
+      amClient = new AMRMClientImpl(attemptId);
+      amClient.init(conf);
+      amClient.start();
+
+      amClient.registerApplicationMaster("Host", 10000, "");
+
+      testAllocation(amClient);
+
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
+
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
+  
+  
+  private void testAllocation(final AMRMClientImpl amClient)  
+      throws YarnRemoteException {
+    // setup container request
+    final Resource capability = Records.newRecord(Resource.class);
+    final Priority priority = Records.newRecord(Priority.class);
+    priority.setPriority(0);
+    capability.setMemory(1024);
+    String node = nodeReports.get(0).getNodeId().getHost();
+    String rack = nodeReports.get(0).getRackName();
+    final String[] nodes = { node };
+    final String[] racks = { rack };
+    
+    assertTrue(amClient.ask.size() == 0);
+    assertTrue(amClient.release.size() == 0);
+    
+    amClient.addContainerRequest(new ContainerRequest(capability, nodes,
+        racks, priority, 1));
+    amClient.addContainerRequest(new ContainerRequest(capability, nodes,
+        racks, priority, 3));
+    amClient.removeContainerRequest(new ContainerRequest(capability, nodes,
+        racks, priority, 2));
+    
+    int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
+        .get(node).get(capability).getNumContainers();
+    int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
+        .get(rack).get(capability).getNumContainers();
+    int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
+        .get(AMRMClient.ANY).get(capability).getNumContainers();
+
+    assertTrue(containersRequestedNode == 2);
+    assertTrue(containersRequestedRack == 2);
+    assertTrue(containersRequestedAny == 2);
+    assertTrue(amClient.ask.size() == 3);
+    assertTrue(amClient.release.size() == 0);
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    int iterationsLeft = 2;
+    Set<ContainerId> releases = new TreeSet<ContainerId>();
+    while (allocatedContainerCount < containersRequestedAny
+        && iterationsLeft-- > 0) {
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertTrue(amClient.ask.size() == 0);
+      assertTrue(amClient.release.size() == 0);
+      
+      assertTrue(nodeCount == amClient.getClusterNodeCount());
+      AMResponse amResponse = allocResponse.getAMResponse();
+      allocatedContainerCount += amResponse.getAllocatedContainers().size();
+      for(Container container : amResponse.getAllocatedContainers()) {
+        ContainerId rejectContainerId = container.getId();
+        releases.add(rejectContainerId);
+        amClient.releaseAssignedContainer(rejectContainerId);
+      }
+      if(allocatedContainerCount < containersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(1000);
+      }
+    }
+
+    assertTrue(allocatedContainerCount == containersRequestedAny);
+    assertTrue(amClient.release.size() == 2);
+    assertTrue(amClient.ask.size() == 0);
+    
+    // need to tell the AMRMClient that we dont need these resources anymore
+    amClient.removeContainerRequest(new ContainerRequest(capability, nodes,
+        racks, priority, 2));
+    assertTrue(amClient.ask.size() == 3);
+    // send 0 container count request for resources that are no longer needed
+    ResourceRequest snoopRequest = amClient.ask.iterator().next();
+    assertTrue(snoopRequest.getNumContainers() == 0);
+    
+    // test RPC exception handling
+    amClient.addContainerRequest(new ContainerRequest(capability, nodes,
+        racks, priority, 2));
+    snoopRequest = amClient.ask.iterator().next();
+    assertTrue(snoopRequest.getNumContainers() == 2);
+    
+    AMRMProtocol realRM = amClient.rmClient;
+    try {
+      AMRMProtocol mockRM = mock(AMRMProtocol.class);
+      when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
+          new Answer<AllocateResponse>() {
+            public AllocateResponse answer(InvocationOnMock invocation)
+                throws Exception {
+              amClient.removeContainerRequest(new ContainerRequest(capability,
+                  nodes, racks, priority, 2));
+              throw new Exception();
+            }
+          });
+      amClient.rmClient = mockRM;
+      amClient.allocate(0.1f);
+    }catch (Exception ioe) {}
+    finally {
+      amClient.rmClient = realRM;
+    }
+
+    assertTrue(amClient.release.size() == 2);
+    assertTrue(amClient.ask.size() == 3);
+    snoopRequest = amClient.ask.iterator().next();
+    // verify that the remove request made in between makeRequest and allocate 
+    // has not been lost
+    assertTrue(snoopRequest.getNumContainers() == 0);
+    
+    iterationsLeft = 2;
+    // do a few iterations to ensure RM is not going send new containers
+    while(!releases.isEmpty() || iterationsLeft-- > 0) {
+      // inform RM of rejection
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      AMResponse amResponse = allocResponse.getAMResponse();
+      // RM did not send new containers because AM does not need any
+      assertTrue(amResponse.getAllocatedContainers().size() == 0);
+      if(amResponse.getCompletedContainersStatuses().size() > 0) {
+        for(ContainerStatus cStatus : amResponse.getCompletedContainersStatuses()) {
+          if(releases.contains(cStatus.getContainerId())) {
+            assertTrue(cStatus.getState() == ContainerState.COMPLETE);
+            assertTrue(cStatus.getExitStatus() == -100);
+            releases.remove(cStatus.getContainerId());
+          }
+        }
+      }
+      if(iterationsLeft > 0) {
+        // sleep to make sure NM's heartbeat
+        sleep(1000);
+      }
+    }
+    
+    assertTrue(amClient.ask.size() == 0);
+    assertTrue(amClient.release.size() == 0);
+  }
+  
+  private void sleep(int sleepTime) {
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+}