Browse Source

YARN-5124. Modify AMRMClient to set the ExecutionType in the ResourceRequest. (asuresh)

Arun Suresh 9 năm trước cách đây
mục cha
commit
5143277958
11 tập tin đã thay đổi với 1204 bổ sung586 xóa
  1. 35 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
  2. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
  3. 107 187
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
  4. 332 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
  5. 197 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java
  6. 16 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
  7. 52 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
  8. 14 157
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
  9. 427 217
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
  10. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
  11. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java

+ 35 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -109,7 +110,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     final Priority priority;
     final boolean relaxLocality;
     final String nodeLabelsExpression;
-    final ExecutionType executionType;
+    final ExecutionTypeRequest executionTypeRequest;
     
     /**
      * Instantiates a {@link ContainerRequest} with the given constraints and
@@ -180,7 +181,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
         Priority priority, boolean relaxLocality, String nodeLabelsExpression) {
       this(capability, nodes, racks, priority, relaxLocality,
           nodeLabelsExpression,
-          ExecutionType.GUARANTEED);
+          ExecutionTypeRequest.newInstance());
     }
           
     /**
@@ -203,12 +204,12 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
      * @param nodeLabelsExpression
      *          Set node labels to allocate resource, now we only support
      *          asking for only a single node label
-     * @param executionType
+     * @param executionTypeRequest
      *          Set the execution type of the container request.
      */
     public ContainerRequest(Resource capability, String[] nodes, String[] racks,
         Priority priority, boolean relaxLocality, String nodeLabelsExpression,
-        ExecutionType executionType) {
+        ExecutionTypeRequest executionTypeRequest) {
       // Validate request
       Preconditions.checkArgument(capability != null,
           "The Resource to be requested for each container " +
@@ -226,7 +227,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
       this.priority = priority;
       this.relaxLocality = relaxLocality;
       this.nodeLabelsExpression = nodeLabelsExpression;
-      this.executionType = executionType;
+      this.executionTypeRequest = executionTypeRequest;
     }
     
     public Resource getCapability() {
@@ -253,15 +254,16 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
       return nodeLabelsExpression;
     }
     
-    public ExecutionType getExecutionType() {
-      return executionType;
+    public ExecutionTypeRequest getExecutionTypeRequest() {
+      return executionTypeRequest;
     }
 
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("Capability[").append(capability).append("]");
       sb.append("Priority[").append(priority).append("]");
-      sb.append("ExecutionType[").append(executionType).append("]");
+      sb.append("ExecutionTypeRequest[").append(executionTypeRequest)
+          .append("]");
       return sb.toString();
     }
   }
@@ -388,10 +390,35 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
    * collection, requests will be returned in the same order as they were added.
    * @return Collection of request matching the parameters
    */
+  @InterfaceStability.Evolving
   public abstract List<? extends Collection<T>> getMatchingRequests(
                                            Priority priority, 
                                            String resourceName, 
                                            Resource capability);
+
+  /**
+   * Get outstanding <code>ContainerRequest</code>s matching the given
+   * parameters. These ContainerRequests should have been added via
+   * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
+   * the AMRMClient may return its internal collection directly without creating
+   * a copy. Users should not perform mutable operations on the return value.
+   * Each collection in the list contains requests with identical
+   * <code>Resource</code> size that fit in the given capability. In a
+   * collection, requests will be returned in the same order as they were added.
+   * specify an <code>ExecutionType</code> .
+   * @param priority Priority
+   * @param resourceName Location
+   * @param executionType ExecutionType
+   * @param capability Capability
+   * @return Collection of request matching the parameters
+   */
+  @InterfaceStability.Evolving
+  public List<? extends Collection<T>> getMatchingRequests(
+      Priority priority, String resourceName, ExecutionType executionType,
+      Resource capability) {
+    throw new UnsupportedOperationException("The sub-class extending" +
+        " AMRMClient is expected to implement this !!");
+  }
   
   /**
    * Update application's blacklist with addition or removal resources.

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -196,6 +197,22 @@ extends AbstractService {
                                                    Priority priority, 
                                                    String resourceName, 
                                                    Resource capability);
+
+  /**
+   * Returns all matching ContainerRequests that match the given Priority,
+   * ResourceName, ExecutionType and Capability.
+   * @param priority Priority.
+   * @param resourceName Location.
+   * @param executionType ExecutionType.
+   * @param capability Capability.
+   * @return All matching ContainerRequests
+   */
+  public List<? extends Collection<T>> getMatchingRequests(
+      Priority priority, String resourceName, ExecutionType executionType,
+      Resource capability) {
+    return client.getMatchingRequests(priority, resourceName,
+        executionType, capability);
+  }
   
   /**
    * Registers this application master with the resource manager. On successful

+ 107 - 187
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java

@@ -19,19 +19,19 @@
 package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.AbstractMap.SimpleEntry;
 
@@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -102,7 +104,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   protected final Set<String> blacklistAdditions = new HashSet<String>();
   protected final Set<String> blacklistRemovals = new HashSet<String>();
   
-  class ResourceRequestInfo {
+  static class ResourceRequestInfo<T> {
     ResourceRequest remoteRequest;
     LinkedHashSet<T> containerRequests;
     
@@ -115,11 +117,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     }
   }
 
-
   /**
    * Class compares Resource by memory then cpu in reverse order
    */
-  class ResourceReverseMemoryThenCpuComparator implements Comparator<Resource> {
+  static class ResourceReverseMemoryThenCpuComparator implements
+      Comparator<Resource>, Serializable {
+    static final long serialVersionUID = 12345L;
     @Override
     public int compare(Resource arg0, Resource arg1) {
       long mem0 = arg0.getMemorySize();
@@ -141,7 +144,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       return -1;
     }    
   }
-  
+
   static boolean canFit(Resource arg0, Resource arg1) {
     long mem0 = arg0.getMemorySize();
     long mem1 = arg1.getMemorySize();
@@ -150,17 +153,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     
     return (mem0 <= mem1 && cpu0 <= cpu1);
   }
-  
-  //Key -> Priority
-  //Value -> Map
-  //Key->ResourceName (e.g., nodename, rackname, *)
-  //Value->Map
-  //Key->Resource Capability
-  //Value->ResourceRequest
-  protected final 
-  Map<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>
-    remoteRequestsTable =
-    new TreeMap<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>();
+
+  final RemoteRequestsTable remoteRequestsTable = new RemoteRequestsTable<T>();
 
   protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
       new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
@@ -185,6 +179,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     super(AMRMClientImpl.class.getName());
   }
 
+  @VisibleForTesting
+  AMRMClientImpl(ApplicationMasterProtocol protocol) {
+    super(AMRMClientImpl.class.getName());
+    this.rmClient = protocol;
+  }
+
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     RackResolver.init(conf);
@@ -195,8 +195,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   protected void serviceStart() throws Exception {
     final YarnConfiguration conf = new YarnConfiguration(getConfig());
     try {
-      rmClient =
-          ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
+      if (rmClient == null) {
+        rmClient = ClientRMProxy.createRMProxy(
+            conf, ApplicationMasterProtocol.class);
+      }
     } catch (IOException e) {
       throw new YarnRuntimeException(e);
     }
@@ -263,7 +265,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
           // RPC layer is using it to send info across
           askList.add(ResourceRequest.newInstance(r.getPriority(),
               r.getResourceName(), r.getCapability(), r.getNumContainers(),
-              r.getRelaxLocality(), r.getNodeLabelExpression()));
+              r.getRelaxLocality(), r.getNodeLabelExpression(),
+              r.getExecutionTypeRequest()));
         }
         List<ContainerResourceChangeRequest> increaseList = new ArrayList<>();
         List<ContainerResourceChangeRequest> decreaseList = new ArrayList<>();
@@ -315,13 +318,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
         synchronized (this) {
           release.addAll(this.pendingRelease);
           blacklistAdditions.addAll(this.blacklistedNodes);
-          for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable
-            .values()) {
-            for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) {
-              for (ResourceRequestInfo request : capabalities.values()) {
-                addResourceRequestToAsk(request.remoteRequest);
-              }
-            }
+          @SuppressWarnings("unchecked")
+          Iterator<ResourceRequestInfo<T>> reqIter =
+              remoteRequestsTable.iterator();
+          while (reqIter.hasNext()) {
+            addResourceRequestToAsk(reqIter.next().remoteRequest);
           }
           change.putAll(this.pendingChange);
         }
@@ -517,26 +518,28 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
             + joiner.join(req.getNodes()));        
       }
       for (String node : dedupedNodes) {
-        addResourceRequest(req.getPriority(), node, req.getCapability(), req,
-            true, req.getNodeLabelExpression());
+        addResourceRequest(req.getPriority(), node,
+            req.getExecutionTypeRequest(), req.getCapability(), req, true,
+            req.getNodeLabelExpression());
       }
     }
 
     for (String rack : dedupedRacks) {
-      addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
-          true, req.getNodeLabelExpression());
+      addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
+          req.getCapability(), req, true, req.getNodeLabelExpression());
     }
 
     // Ensure node requests are accompanied by requests for
     // corresponding rack
     for (String rack : inferredRacks) {
-      addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
-          req.getRelaxLocality(), req.getNodeLabelExpression());
+      addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
+          req.getCapability(), req, req.getRelaxLocality(),
+          req.getNodeLabelExpression());
     }
-
     // Off-switch
-    addResourceRequest(req.getPriority(), ResourceRequest.ANY, 
-        req.getCapability(), req, req.getRelaxLocality(), req.getNodeLabelExpression());
+    addResourceRequest(req.getPriority(), ResourceRequest.ANY,
+        req.getExecutionTypeRequest(), req.getCapability(), req,
+        req.getRelaxLocality(), req.getNodeLabelExpression());
   }
 
   @Override
@@ -552,16 +555,18 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     // Update resource requests
     if (req.getNodes() != null) {
       for (String node : new HashSet<String>(req.getNodes())) {
-        decResourceRequest(req.getPriority(), node, req.getCapability(), req);
+        decResourceRequest(req.getPriority(), node,
+            req.getExecutionTypeRequest(), req.getCapability(), req);
       }
     }
 
     for (String rack : allRacks) {
-      decResourceRequest(req.getPriority(), rack, req.getCapability(), req);
+      decResourceRequest(req.getPriority(), rack,
+          req.getExecutionTypeRequest(), req.getCapability(), req);
     }
 
     decResourceRequest(req.getPriority(), ResourceRequest.ANY,
-        req.getCapability(), req);
+        req.getExecutionTypeRequest(), req.getCapability(), req);
   }
 
   @Override
@@ -601,47 +606,38 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   public synchronized int getClusterNodeCount() {
     return clusterNodeCount;
   }
-  
+
+  @Override
+  public synchronized List<? extends Collection<T>> getMatchingRequests(
+      Priority priority,
+      String resourceName,
+      Resource capability) {
+    return getMatchingRequests(priority, resourceName,
+        ExecutionType.GUARANTEED, capability);
+  }
+
   @Override
   public synchronized List<? extends Collection<T>> getMatchingRequests(
-                                          Priority priority, 
-                                          String resourceName, 
-                                          Resource capability) {
+      Priority priority, String resourceName, ExecutionType executionType,
+      Resource capability) {
     Preconditions.checkArgument(capability != null,
         "The Resource to be requested should not be null ");
     Preconditions.checkArgument(priority != null,
         "The priority at which to request containers should not be null ");
     List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
-    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests = 
-        this.remoteRequestsTable.get(priority);
-    if (remoteRequests == null) {
-      return list;
-    }
-    TreeMap<Resource, ResourceRequestInfo> reqMap = remoteRequests
-        .get(resourceName);
-    if (reqMap == null) {
-      return list;
-    }
 
-    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
-    if (resourceRequestInfo != null &&
-        !resourceRequestInfo.containerRequests.isEmpty()) {
-      list.add(resourceRequestInfo.containerRequests);
-      return list;
-    }
-    
-    // no exact match. Container may be larger than what was requested.
-    // get all resources <= capability. map is reverse sorted. 
-    SortedMap<Resource, ResourceRequestInfo> tailMap = 
-                                                  reqMap.tailMap(capability);
-    for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
-      if (canFit(entry.getKey(), capability) &&
-          !entry.getValue().containerRequests.isEmpty()) {
-        // match found that fits in the larger resource
-        list.add(entry.getValue().containerRequests);
+    @SuppressWarnings("unchecked")
+    List<ResourceRequestInfo<T>> matchingRequests =
+        this.remoteRequestsTable.getMatchingRequests(priority, resourceName,
+            executionType, capability);
+    // If no exact match. Container may be larger than what was requested.
+    // get all resources <= capability. map is reverse sorted.
+    for (ResourceRequestInfo<T> resReqInfo : matchingRequests) {
+      if (canFit(resReqInfo.remoteRequest.getCapability(), capability) &&
+        !resReqInfo.containerRequests.isEmpty()) {
+        list.add(resReqInfo.containerRequests);
       }
     }
-    
     // no match found
     return list;          
   }
@@ -663,34 +659,30 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     
     return racks;
   }
-  
+
   /**
    * ContainerRequests with locality relaxation cannot be made at the same
    * priority as ContainerRequests without locality relaxation.
    */
   private void checkLocalityRelaxationConflict(Priority priority,
       Collection<String> locations, boolean relaxLocality) {
-    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
-        this.remoteRequestsTable.get(priority);
-    if (remoteRequests == null) {
-      return;
-    }
     // Locality relaxation will be set to relaxLocality for all implicitly
     // requested racks. Make sure that existing rack requests match this.
-    for (String location : locations) {
-        TreeMap<Resource, ResourceRequestInfo> reqs =
-            remoteRequests.get(location);
-        if (reqs != null && !reqs.isEmpty()) {
-          boolean existingRelaxLocality =
-              reqs.values().iterator().next().remoteRequest.getRelaxLocality();
-          if (relaxLocality != existingRelaxLocality) {
-            throw new InvalidContainerRequestException("Cannot submit a "
-                + "ContainerRequest asking for location " + location
-                + " with locality relaxation " + relaxLocality + " when it has "
-                + "already been requested with locality relaxation " + existingRelaxLocality);
-          }
-        }
+
+    @SuppressWarnings("unchecked")
+    List<ResourceRequestInfo> allCapabilityMaps =
+        remoteRequestsTable.getAllResourceRequestInfos(priority, locations);
+    for (ResourceRequestInfo reqs : allCapabilityMaps) {
+      ResourceRequest remoteRequest = reqs.remoteRequest;
+      boolean existingRelaxLocality = remoteRequest.getRelaxLocality();
+      if (relaxLocality != existingRelaxLocality) {
+        throw new InvalidContainerRequestException("Cannot submit a "
+            + "ContainerRequest asking for location "
+            + remoteRequest.getResourceName() + " with locality relaxation "
+            + relaxLocality + " when it has already been requested"
+            + "with locality relaxation " + existingRelaxLocality);
       }
+    }
   }
   
   /**
@@ -747,46 +739,13 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     ask.add(remoteRequest);
   }
 
-  private void
-      addResourceRequest(Priority priority, String resourceName,
-          Resource capability, T req, boolean relaxLocality,
-          String labelExpression) {
-    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
-      this.remoteRequestsTable.get(priority);
-    if (remoteRequests == null) {
-      remoteRequests = 
-          new HashMap<String, TreeMap<Resource, ResourceRequestInfo>>();
-      this.remoteRequestsTable.put(priority, remoteRequests);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Added priority=" + priority);
-      }
-    }
-    TreeMap<Resource, ResourceRequestInfo> reqMap = 
-                                          remoteRequests.get(resourceName);
-    if (reqMap == null) {
-      // capabilities are stored in reverse sorted order. smallest last.
-      reqMap = new TreeMap<Resource, ResourceRequestInfo>(
-          new ResourceReverseMemoryThenCpuComparator());
-      remoteRequests.put(resourceName, reqMap);
-    }
-    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
-    if (resourceRequestInfo == null) {
-      resourceRequestInfo =
-          new ResourceRequestInfo(priority, resourceName, capability,
-              relaxLocality);
-      reqMap.put(capability, resourceRequestInfo);
-    }
-    
-    resourceRequestInfo.remoteRequest.setNumContainers(
-         resourceRequestInfo.remoteRequest.getNumContainers() + 1);
-
-    if (relaxLocality) {
-      resourceRequestInfo.containerRequests.add(req);
-    }
-    
-    if (ResourceRequest.ANY.equals(resourceName)) {
-      resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
-    }
+  private void addResourceRequest(Priority priority, String resourceName,
+      ExecutionTypeRequest execTypeReq, Resource capability, T req,
+      boolean relaxLocality, String labelExpression) {
+    @SuppressWarnings("unchecked")
+    ResourceRequestInfo resourceRequestInfo = remoteRequestsTable
+        .addResourceRequest(priority, resourceName,
+        execTypeReq, capability, req, relaxLocality, labelExpression);
 
     // Note this down for next interaction with ResourceManager
     addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
@@ -800,70 +759,31 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     }
   }
 
-  private void decResourceRequest(Priority priority, 
-                                   String resourceName,
-                                   Resource capability, 
-                                   T req) {
-    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
-      this.remoteRequestsTable.get(priority);
-    
-    if(remoteRequests == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not decrementing resource as priority " + priority 
-            + " is not present in request table");
-      }
-      return;
-    }
-    
-    Map<Resource, ResourceRequestInfo> reqMap = remoteRequests.get(resourceName);
-    if (reqMap == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not decrementing resource as " + resourceName
-            + " is not present in request table");
-      }
-      return;
-    }
-    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
-          + " priority=" + priority.getPriority()
-          + " resourceName=" + resourceName + " numContainers="
-          + resourceRequestInfo.remoteRequest.getNumContainers() 
-          + " #asks=" + ask.size());
-    }
-
-    resourceRequestInfo.remoteRequest.setNumContainers(
-        resourceRequestInfo.remoteRequest.getNumContainers() - 1);
-
-    resourceRequestInfo.containerRequests.remove(req);
-    
-    if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
-      // guard against spurious removals
-      resourceRequestInfo.remoteRequest.setNumContainers(0);
-    }
+  private void decResourceRequest(Priority priority, String resourceName,
+      ExecutionTypeRequest execTypeReq, Resource capability, T req) {
+    @SuppressWarnings("unchecked")
+    ResourceRequestInfo resourceRequestInfo =
+        remoteRequestsTable.decResourceRequest(priority, resourceName,
+            execTypeReq, capability, req);
     // send the ResourceRequest to RM even if is 0 because it needs to override
     // a previously sent value. If ResourceRequest was not sent previously then
     // sending 0 aught to be a no-op on RM
-    addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
+    if (resourceRequestInfo != null) {
+      addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
 
-    // delete entries from map if no longer needed
-    if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
-      reqMap.remove(capability);
-      if (reqMap.size() == 0) {
-        remoteRequests.remove(resourceName);
+      // delete entry from map if no longer needed
+      if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
+        this.remoteRequestsTable.remove(priority, resourceName,
+            execTypeReq.getExecutionType(), capability);
       }
-      if (remoteRequests.size() == 0) {
-        remoteRequestsTable.remove(priority);
-      }
-    }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("AFTER decResourceRequest:" + " applicationId="
-          + " priority=" + priority.getPriority()
-          + " resourceName=" + resourceName + " numContainers="
-          + resourceRequestInfo.remoteRequest.getNumContainers() 
-          + " #asks=" + ask.size());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AFTER decResourceRequest:" + " applicationId="
+            + " priority=" + priority.getPriority()
+            + " resourceName=" + resourceName + " numContainers="
+            + resourceRequestInfo.remoteRequest.getNumContainers()
+            + " #asks=" + ask.size());
+      }
     }
   }
 

+ 332 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java

@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceRequestInfo;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceReverseMemoryThenCpuComparator;
+
+class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
+
+  private static final Log LOG = LogFactory.getLog(RemoteRequestsTable.class);
+
+  static ResourceReverseMemoryThenCpuComparator resourceComparator =
+      new ResourceReverseMemoryThenCpuComparator();
+
+  /**
+   * Nested Iterator that iterates over just the ResourceRequestInfo
+   * object.
+   */
+  class RequestInfoIterator implements Iterator<ResourceRequestInfo> {
+    private Iterator<Map<String, Map<ExecutionType, TreeMap<Resource,
+        ResourceRequestInfo>>>> iLocMap;
+    private Iterator<Map<ExecutionType, TreeMap<Resource,
+        ResourceRequestInfo>>> iExecTypeMap;
+    private Iterator<TreeMap<Resource, ResourceRequestInfo>> iCapMap;
+    private Iterator<ResourceRequestInfo> iResReqInfo;
+
+    public RequestInfoIterator(Iterator<Map<String,
+        Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>>>
+        iLocationMap) {
+      this.iLocMap = iLocationMap;
+      if (iLocMap.hasNext()) {
+        iExecTypeMap = iLocMap.next().values().iterator();
+      } else {
+        iExecTypeMap =
+            new LinkedList<Map<ExecutionType, TreeMap<Resource,
+                ResourceRequestInfo>>>().iterator();
+      }
+      if (iExecTypeMap.hasNext()) {
+        iCapMap = iExecTypeMap.next().values().iterator();
+      } else {
+        iCapMap =
+            new LinkedList<TreeMap<Resource, ResourceRequestInfo>>()
+                .iterator();
+      }
+      if (iCapMap.hasNext()) {
+        iResReqInfo = iCapMap.next().values().iterator();
+      } else {
+        iResReqInfo = new LinkedList<ResourceRequestInfo>().iterator();
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iLocMap.hasNext()
+          || iExecTypeMap.hasNext()
+          || iCapMap.hasNext()
+          || iResReqInfo.hasNext();
+    }
+
+    @Override
+    public ResourceRequestInfo next() {
+      if (!iResReqInfo.hasNext()) {
+        if (!iCapMap.hasNext()) {
+          if (!iExecTypeMap.hasNext()) {
+            iExecTypeMap = iLocMap.next().values().iterator();
+          }
+          iCapMap = iExecTypeMap.next().values().iterator();
+        }
+        iResReqInfo = iCapMap.next().values().iterator();
+      }
+      return iResReqInfo.next();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Remove is not supported" +
+          "for this iterator !!");
+    }
+  }
+
+  // Nest map with Primary key :
+  // Priority -> ResourceName(String) -> ExecutionType -> Capability(Resource)
+  // and value : ResourceRequestInfo
+  private Map<Priority, Map<String, Map<ExecutionType, TreeMap<Resource,
+      ResourceRequestInfo>>>> remoteRequestsTable = new HashMap<>();
+
+  @Override
+  public Iterator<ResourceRequestInfo> iterator() {
+    return new RequestInfoIterator(remoteRequestsTable.values().iterator());
+  }
+
+  ResourceRequestInfo get(Priority priority, String location,
+      ExecutionType execType, Resource capability) {
+    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+        getCapabilityMap(priority, location, execType);
+    if (capabilityMap == null) {
+      return null;
+    }
+    return capabilityMap.get(capability);
+  }
+
+  void put(Priority priority, String resourceName, ExecutionType execType,
+      Resource capability, ResourceRequestInfo resReqInfo) {
+    Map<String, Map<ExecutionType, TreeMap<Resource,
+        ResourceRequestInfo>>> locationMap =
+        remoteRequestsTable.get(priority);
+    if (locationMap == null) {
+      locationMap = new HashMap<>();
+      this.remoteRequestsTable.put(priority, locationMap);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added priority=" + priority);
+      }
+    }
+    Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>> execTypeMap =
+        locationMap.get(resourceName);
+    if (execTypeMap == null) {
+      execTypeMap = new HashMap<>();
+      locationMap.put(resourceName, execTypeMap);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added resourceName=" + resourceName);
+      }
+    }
+    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+        execTypeMap.get(execType);
+    if (capabilityMap == null) {
+      capabilityMap = new TreeMap<>(resourceComparator);
+      execTypeMap.put(execType, capabilityMap);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added Execution Type=" + execType);
+      }
+    }
+    capabilityMap.put(capability, resReqInfo);
+  }
+
+  ResourceRequestInfo remove(Priority priority, String resourceName,
+      ExecutionType execType, Resource capability) {
+    ResourceRequestInfo retVal = null;
+    Map<String, Map<ExecutionType, TreeMap<Resource,
+        ResourceRequestInfo>>> locationMap = remoteRequestsTable.get(priority);
+    if (locationMap == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No such priority=" + priority);
+      }
+      return null;
+    }
+    Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+        execTypeMap = locationMap.get(resourceName);
+    if (execTypeMap == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No such resourceName=" + resourceName);
+      }
+      return null;
+    }
+    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+        execTypeMap.get(execType);
+    if (capabilityMap == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No such Execution Type=" + execType);
+      }
+      return null;
+    }
+    retVal = capabilityMap.remove(capability);
+    if (capabilityMap.size() == 0) {
+      execTypeMap.remove(execType);
+      if (execTypeMap.size() == 0) {
+        locationMap.remove(resourceName);
+        if (locationMap.size() == 0) {
+          this.remoteRequestsTable.remove(priority);
+        }
+      }
+    }
+    return retVal;
+  }
+
+  Map<String, Map<ExecutionType, TreeMap<Resource,
+      ResourceRequestInfo>>> getLocationMap(Priority priority) {
+    return remoteRequestsTable.get(priority);
+  }
+
+  Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+      getExecutionTypeMap(Priority priority, String location) {
+    Map<String, Map<ExecutionType, TreeMap<Resource,
+        ResourceRequestInfo>>> locationMap = getLocationMap(priority);
+    if (locationMap == null) {
+      return null;
+    }
+    return locationMap.get(location);
+  }
+
+  TreeMap<Resource, ResourceRequestInfo> getCapabilityMap(Priority
+      priority, String location,
+      ExecutionType execType) {
+    Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+        executionTypeMap = getExecutionTypeMap(priority, location);
+    if (executionTypeMap == null) {
+      return null;
+    }
+    return executionTypeMap.get(execType);
+  }
+
+  @SuppressWarnings("unchecked")
+  List<ResourceRequestInfo> getAllResourceRequestInfos(Priority priority,
+      Collection<String> locations) {
+    List retList = new LinkedList<>();
+    for (String location : locations) {
+      for (ExecutionType eType : ExecutionType.values()) {
+        TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+            getCapabilityMap(priority, location, eType);
+        if (capabilityMap != null) {
+          retList.addAll(capabilityMap.values());
+        }
+      }
+    }
+    return retList;
+  }
+
+  List<ResourceRequestInfo> getMatchingRequests(
+      Priority priority, String resourceName, ExecutionType executionType,
+      Resource capability) {
+    List<ResourceRequestInfo> list = new LinkedList<>();
+    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+        getCapabilityMap(priority, resourceName, executionType);
+    if (capabilityMap != null) {
+      ResourceRequestInfo resourceRequestInfo = capabilityMap.get(capability);
+      if (resourceRequestInfo != null) {
+        list.add(resourceRequestInfo);
+      } else {
+        list.addAll(capabilityMap.tailMap(capability).values());
+      }
+    }
+    return list;
+  }
+
+  @SuppressWarnings("unchecked")
+  ResourceRequestInfo addResourceRequest(Priority priority, String resourceName,
+      ExecutionTypeRequest execTypeReq, Resource capability, T req,
+      boolean relaxLocality, String labelExpression) {
+    ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
+        execTypeReq.getExecutionType(), capability);
+    if (resourceRequestInfo == null) {
+      resourceRequestInfo =
+          new ResourceRequestInfo(priority, resourceName, capability,
+              relaxLocality);
+      put(priority, resourceName, execTypeReq.getExecutionType(), capability,
+          resourceRequestInfo);
+    }
+    resourceRequestInfo.remoteRequest.setExecutionTypeRequest(execTypeReq);
+    resourceRequestInfo.remoteRequest.setNumContainers(
+        resourceRequestInfo.remoteRequest.getNumContainers() + 1);
+
+    if (relaxLocality) {
+      resourceRequestInfo.containerRequests.add(req);
+    }
+
+    if (ResourceRequest.ANY.equals(resourceName)) {
+      resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
+    }
+    return resourceRequestInfo;
+  }
+
+  ResourceRequestInfo decResourceRequest(Priority priority, String resourceName,
+      ExecutionTypeRequest execTypeReq, Resource capability, T req) {
+    ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
+        execTypeReq.getExecutionType(), capability);
+
+    if (resourceRequestInfo == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as ResourceRequestInfo with" +
+            "priority=" + priority + ", " +
+            "resourceName=" + resourceName + ", " +
+            "executionType=" + execTypeReq + ", " +
+            "capability=" + capability + " is not present in request table");
+      }
+      return null;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+          + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + resourceRequestInfo.remoteRequest.getNumContainers());
+    }
+
+    resourceRequestInfo.remoteRequest.setNumContainers(
+        resourceRequestInfo.remoteRequest.getNumContainers() - 1);
+
+    resourceRequestInfo.containerRequests.remove(req);
+
+    if (resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
+      // guard against spurious removals
+      resourceRequestInfo.remoteRequest.setNumContainers(0);
+    }
+    return resourceRequestInfo;
+  }
+
+  boolean isEmpty() {
+    return remoteRequestsTable.isEmpty();
+  }
+
+}

+ 197 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java

@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Base test case to be used for Testing frameworks that use AMRMProxy.
+ */
+public abstract class BaseAMRMProxyE2ETest {
+
+  protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient,
+      ApplicationId appId, MiniYARNCluster cluster,
+      final Configuration yarnConf)
+      throws IOException, InterruptedException, YarnException {
+
+    UserGroupInformation user = null;
+
+    // Get the AMRMToken from AMRMProxy
+
+    ApplicationReport report = rmClient.getApplicationReport(appId);
+
+    user = UserGroupInformation.createProxyUser(
+        report.getCurrentApplicationAttemptId().toString(),
+        UserGroupInformation.getCurrentUser());
+
+    ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster
+        .getNodeManager(0).getNMContext().getContainerManager();
+
+    AMRMProxyTokenSecretManager amrmTokenSecretManager =
+        containerManager.getAMRMProxyService().getSecretManager();
+    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
+        amrmTokenSecretManager
+            .createAndGetAMRMToken(report.getCurrentApplicationAttemptId());
+
+    SecurityUtil.setTokenService(token,
+        containerManager.getAMRMProxyService().getBindAddress());
+    user.addToken(token);
+
+    // Start Application Master
+
+    return user
+        .doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
+          @Override
+          public ApplicationMasterProtocol run() throws Exception {
+            return ClientRMProxy.createRMProxy(yarnConf,
+                ApplicationMasterProtocol.class);
+          }
+        });
+  }
+
+  protected AllocateRequest createAllocateRequest(List<NodeReport> listNode) {
+    // The test needs AMRMClient to create a real allocate request
+    AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
+        new AMRMClientImpl<>();
+
+    Resource capability = Resource.newInstance(1024, 2);
+    Priority priority = Priority.newInstance(1);
+    List<NodeReport> nodeReports = listNode;
+    String node = nodeReports.get(0).getNodeId().getHost();
+    String[] nodes = new String[] {node};
+
+    AMRMClient.ContainerRequest storedContainer1 =
+        new AMRMClient.ContainerRequest(capability, nodes, null, priority);
+    amClient.addContainerRequest(storedContainer1);
+    amClient.addContainerRequest(storedContainer1);
+
+    List<ResourceRequest> resourceAsk = new ArrayList<>();
+    for (ResourceRequest rr : amClient.ask) {
+      resourceAsk.add(rr);
+    }
+
+    ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest
+        .newInstance(new ArrayList<>(), new ArrayList<>());
+
+    int responseId = 1;
+
+    return AllocateRequest.newInstance(responseId, 0, resourceAsk,
+        new ArrayList<>(), resourceBlacklistRequest);
+  }
+
+  protected ApplicationAttemptId createApp(YarnClient yarnClient,
+      MiniYARNCluster yarnCluster, Configuration conf) throws Exception {
+
+    ApplicationSubmissionContext appContext =
+        yarnClient.createApplication().getApplicationSubmissionContext();
+    ApplicationId appId = appContext.getApplicationId();
+
+    appContext.setApplicationName("Test");
+
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(0);
+    appContext.setPriority(pri);
+
+    appContext.setQueue("default");
+
+    ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
+        Collections.<String, LocalResource> emptyMap(),
+        new HashMap<String, String>(), Arrays.asList("sleep", "10000"),
+        new HashMap<String, ByteBuffer>(), null,
+        new HashMap<ApplicationAccessType, String>());
+    appContext.setAMContainerSpec(amContainer);
+    appContext.setResource(Resource.newInstance(1024, 1));
+
+    SubmitApplicationRequest appRequest =
+        Records.newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+
+    yarnClient.submitApplication(appContext);
+
+    RMAppAttempt appAttempt = null;
+    ApplicationAttemptId attemptId = null;
+    while (true) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport
+          .getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+        attemptId =
+            appReport.getCurrentApplicationAttemptId();
+        appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+            .get(attemptId.getApplicationId()).getCurrentAppAttempt();
+        while (true) {
+          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
+            break;
+          }
+        }
+        break;
+      }
+    }
+    Thread.sleep(1000);
+    // Just dig into the ResourceManager and get the AMRMToken just for the sake
+    // of testing.
+    UserGroupInformation.setLoginUser(UserGroupInformation
+        .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+
+    // emulate RM setup of AMRM token in credentials by adding the token
+    // *before* setting the token service
+    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+    appAttempt.getAMRMToken().setService(
+        ClientRMProxy.getAMRMTokenService(conf));
+    return attemptId;
+  }
+}

+ 16 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java

@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NMToken;
@@ -413,11 +414,13 @@ public class TestAMRMClient {
       amClient.addContainerRequest(storedContainer3);
       
       // test addition and storage
-      int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
-       .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+      int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
+          ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+          .remoteRequest.getNumContainers();
       assertEquals(2, containersRequestedAny);
-      containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
-          .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+      containersRequestedAny = amClient.remoteRequestsTable.get(priority1,
+          ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+          .remoteRequest.getNumContainers();
          assertEquals(1, containersRequestedAny);
       List<? extends Collection<ContainerRequest>> matches = 
           amClient.getMatchingRequests(priority, node, capability);
@@ -919,12 +922,15 @@ public class TestAMRMClient {
     amClient.removeContainerRequest(
         new ContainerRequest(capability, nodes, racks, priority));
     
-    int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
-        .get(node).get(capability).remoteRequest.getNumContainers();
-    int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
-        .get(rack).get(capability).remoteRequest.getNumContainers();
-    int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
-    .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+    int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
+        node, ExecutionType.GUARANTEED, capability).remoteRequest
+        .getNumContainers();
+    int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
+        rack, ExecutionType.GUARANTEED, capability).remoteRequest
+        .getNumContainers();
+    int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+        .remoteRequest.getNumContainers();
 
     assertEquals(2, containersRequestedNode);
     assertEquals(2, containersRequestedRack);

+ 52 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java

@@ -26,6 +26,8 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -35,6 +37,46 @@ import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
 import org.junit.Test;
 
 public class TestAMRMClientContainerRequest {
+
+  @Test
+  public void testOpportunisticAndGuaranteedRequests() {
+    AMRMClientImpl<ContainerRequest> client =
+        new AMRMClientImpl<ContainerRequest>();
+
+    Configuration conf = new Configuration();
+    conf.setClass(
+        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        MyResolver.class, DNSToSwitchMapping.class);
+    client.init(conf);
+
+    Resource capability = Resource.newInstance(1024, 1);
+    ContainerRequest request =
+        new ContainerRequest(capability, new String[] {"host1", "host2"},
+            new String[] {"/rack2"}, Priority.newInstance(1));
+    client.addContainerRequest(request);
+    verifyResourceRequest(client, request, "host1", true);
+    verifyResourceRequest(client, request, "host2", true);
+    verifyResourceRequest(client, request, "/rack1", true);
+    verifyResourceRequest(client, request, "/rack2", true);
+    verifyResourceRequest(client, request, ResourceRequest.ANY, true);
+    ContainerRequest request2 =
+        new ContainerRequest(capability, new String[] {"host1", "host2"},
+            new String[] {"/rack2"}, Priority.newInstance(1), true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true));
+    client.addContainerRequest(request2);
+    verifyResourceRequest(client, request, "host1", true,
+        ExecutionType.OPPORTUNISTIC);
+    verifyResourceRequest(client, request, "host2", true,
+        ExecutionType.OPPORTUNISTIC);
+    verifyResourceRequest(client, request, "/rack1", true,
+        ExecutionType.OPPORTUNISTIC);
+    verifyResourceRequest(client, request, "/rack2", true,
+        ExecutionType.OPPORTUNISTIC);
+    verifyResourceRequest(client, request, ResourceRequest.ANY, true,
+        ExecutionType.OPPORTUNISTIC);
+  }
+
   @Test
   public void testFillInRacks() {
     AMRMClientImpl<ContainerRequest> client =
@@ -224,8 +266,16 @@ public class TestAMRMClientContainerRequest {
   private void verifyResourceRequest(
       AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
       String location, boolean expectedRelaxLocality) {
-    ResourceRequest ask =  client.remoteRequestsTable.get(request.getPriority())
-        .get(location).get(request.getCapability()).remoteRequest;
+    verifyResourceRequest(client, request, location, expectedRelaxLocality,
+        ExecutionType.GUARANTEED);
+  }
+
+  private void verifyResourceRequest(
+      AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
+      String location, boolean expectedRelaxLocality,
+      ExecutionType executionType) {
+    ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority(),
+        location, executionType, request.getCapability()).remoteRequest;
     assertEquals(location, ask.getResourceName());
     assertEquals(1, ask.getNumContainers());
     assertEquals(expectedRelaxLocality, ask.getRelaxLocality());

+ 14 - 157
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java

@@ -19,20 +19,12 @@
 package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -40,43 +32,25 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class TestAMRMProxy {
+/**
+ * End-to-End test cases for the AMRMProxy Service.
+ */
+public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
 
   private static final Log LOG = LogFactory.getLog(TestAMRMProxy.class);
 
@@ -84,7 +58,7 @@ public class TestAMRMProxy {
    * This test validates register, allocate and finish of an application through
    * the AMRMPRoxy.
    */
-  @Test(timeout = 60000)
+  @Test(timeout = 120000)
   public void testAMRMProxyE2E() throws Exception {
     MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E", 1, 1, 1);
     YarnClient rmClient = null;
@@ -107,7 +81,8 @@ public class TestAMRMProxy {
 
       // Submit application
 
-      ApplicationId appId = createApp(rmClient, cluster);
+      ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
+      ApplicationId appId = appAttmptId.getApplicationId();
 
       client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
 
@@ -173,7 +148,7 @@ public class TestAMRMProxy {
    * that the received token it is different from the previous one within 5
    * requests.
    */
-  @Test(timeout = 60000)
+  @Test(timeout = 120000)
   public void testE2ETokenRenewal() throws Exception {
     MiniYARNCluster cluster =
         new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1);
@@ -201,7 +176,8 @@ public class TestAMRMProxy {
 
       // Submit
 
-      ApplicationId appId = createApp(rmClient, cluster);
+      ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
+      ApplicationId appId = appAttmptId.getApplicationId();
 
       client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
 
@@ -252,7 +228,7 @@ public class TestAMRMProxy {
    * This test validates that an AM cannot register directly to the RM, with the
    * token provided by the AMRMProxy.
    */
-  @Test(timeout = 60000)
+  @Test(timeout = 120000)
   public void testE2ETokenSwap() throws Exception {
     MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap", 1, 1, 1);
     YarnClient rmClient = null;
@@ -270,7 +246,8 @@ public class TestAMRMProxy {
       rmClient.init(yarnConf);
       rmClient.start();
 
-      ApplicationId appId = createApp(rmClient, cluster);
+      ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
+      ApplicationId appId = appAttmptId.getApplicationId();
 
       client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
 
@@ -290,124 +267,4 @@ public class TestAMRMProxy {
       cluster.stop();
     }
   }
-
-  protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient,
-      ApplicationId appId, MiniYARNCluster cluster,
-      final Configuration yarnConf)
-          throws IOException, InterruptedException, YarnException {
-
-    UserGroupInformation user = null;
-
-    // Get the AMRMToken from AMRMProxy
-
-    ApplicationReport report = rmClient.getApplicationReport(appId);
-
-    user = UserGroupInformation.createProxyUser(
-        report.getCurrentApplicationAttemptId().toString(),
-        UserGroupInformation.getCurrentUser());
-
-    ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster
-        .getNodeManager(0).getNMContext().getContainerManager();
-
-    AMRMProxyTokenSecretManager amrmTokenSecretManager =
-        containerManager.getAMRMProxyService().getSecretManager();
-    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
-        amrmTokenSecretManager
-            .createAndGetAMRMToken(report.getCurrentApplicationAttemptId());
-
-    SecurityUtil.setTokenService(token,
-        containerManager.getAMRMProxyService().getBindAddress());
-    user.addToken(token);
-
-    // Start Application Master
-
-    return user
-        .doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
-          @Override
-          public ApplicationMasterProtocol run() throws Exception {
-            return ClientRMProxy.createRMProxy(yarnConf,
-                ApplicationMasterProtocol.class);
-          }
-        });
-  }
-
-  protected AllocateRequest createAllocateRequest(List<NodeReport> listNode) {
-    // The test needs AMRMClient to create a real allocate request
-    AMRMClientImpl<ContainerRequest> amClient =
-        new AMRMClientImpl<ContainerRequest>();
-
-    Resource capability = Resource.newInstance(1024, 2);
-    Priority priority = Priority.newInstance(1);
-    List<NodeReport> nodeReports = listNode;
-    String node = nodeReports.get(0).getNodeId().getHost();
-    String[] nodes = new String[] { node };
-
-    ContainerRequest storedContainer1 =
-        new ContainerRequest(capability, nodes, null, priority);
-    amClient.addContainerRequest(storedContainer1);
-    amClient.addContainerRequest(storedContainer1);
-
-    List<ResourceRequest> resourceAsk = new ArrayList<ResourceRequest>();
-    for (ResourceRequest rr : amClient.ask) {
-      resourceAsk.add(rr);
-    }
-
-    ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest
-        .newInstance(new ArrayList<String>(), new ArrayList<String>());
-
-    int responseId = 1;
-
-    return AllocateRequest.newInstance(responseId, 0, resourceAsk,
-        new ArrayList<ContainerId>(), resourceBlacklistRequest);
-  }
-
-  protected ApplicationId createApp(YarnClient yarnClient,
-      MiniYARNCluster yarnCluster) throws Exception {
-
-    ApplicationSubmissionContext appContext =
-        yarnClient.createApplication().getApplicationSubmissionContext();
-    ApplicationId appId = appContext.getApplicationId();
-
-    appContext.setApplicationName("Test");
-
-    Priority pri = Records.newRecord(Priority.class);
-    pri.setPriority(0);
-    appContext.setPriority(pri);
-
-    appContext.setQueue("default");
-
-    ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
-        Collections.<String, LocalResource> emptyMap(),
-        new HashMap<String, String>(), Arrays.asList("sleep", "10000"),
-        new HashMap<String, ByteBuffer>(), null,
-        new HashMap<ApplicationAccessType, String>());
-    appContext.setAMContainerSpec(amContainer);
-    appContext.setResource(Resource.newInstance(1024, 1));
-
-    SubmitApplicationRequest appRequest =
-        Records.newRecord(SubmitApplicationRequest.class);
-    appRequest.setApplicationSubmissionContext(appContext);
-
-    yarnClient.submitApplication(appContext);
-
-    RMAppAttempt appAttempt = null;
-    while (true) {
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-      if (appReport
-          .getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
-        ApplicationAttemptId attemptId =
-            appReport.getCurrentApplicationAttemptId();
-        appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
-            .get(attemptId.getApplicationId()).getCurrentAppAttempt();
-        while (true) {
-          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
-            break;
-          }
-        }
-        break;
-      }
-    }
-    Thread.sleep(1000);
-    return appId;
-  }
 }

+ 427 - 217
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java

@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,20 +22,31 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -43,12 +54,23 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.Ignore;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Validates End2End Distributed Scheduling flow which includes the AM
@@ -57,11 +79,70 @@ import java.util.List;
  * the NM and the DistributedSchedulingProtocol used by the framework to talk
  * to the DistributedSchedulingService running on the RM.
  */
-public class TestDistributedScheduling extends TestAMRMProxy {
+public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
 
   private static final Log LOG =
       LogFactory.getLog(TestDistributedScheduling.class);
 
+  protected MiniYARNCluster cluster;
+  protected YarnClient rmClient;
+  protected ApplicationMasterProtocol client;
+  protected Configuration conf;
+  protected Configuration yarnConf;
+  protected ApplicationAttemptId attemptId;
+  protected ApplicationId appId;
+
+  @Before
+  public void doBefore() throws Exception {
+    cluster = new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
+
+    conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
+    cluster.init(conf);
+    cluster.start();
+    yarnConf = cluster.getConfig();
+
+    // the client has to connect to AMRMProxy
+    yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
+    rmClient = YarnClient.createYarnClient();
+    rmClient.init(yarnConf);
+    rmClient.start();
+
+    // Submit application
+    attemptId = createApp(rmClient, cluster, conf);
+    appId = attemptId.getApplicationId();
+    client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
+  }
+
+  @After
+  public void doAfter() throws Exception {
+    if (client != null) {
+      try {
+        client.finishApplicationMaster(FinishApplicationMasterRequest
+            .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
+        rmClient.killApplication(attemptId.getApplicationId());
+        attemptId = null;
+      } catch (Exception e) {
+      }
+    }
+    if (rmClient != null) {
+      try {
+        rmClient.stop();
+      } catch (Exception e) {
+      }
+    }
+    if (cluster != null) {
+      try {
+        cluster.stop();
+      } catch (Exception e) {
+      }
+    }
+  }
+
+
   /**
    * Validates if Allocate Requests containing only OPPORTUNISTIC container
    * requests are satisfied instantly.
@@ -70,104 +151,63 @@ public class TestDistributedScheduling extends TestAMRMProxy {
    */
   @Test(timeout = 60000)
   public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
-    MiniYARNCluster cluster =
-        new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
-    YarnClient rmClient = null;
-    ApplicationMasterProtocol client;
-
-    try {
-      Configuration conf = new YarnConfiguration();
-      conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
-      conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
-      conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
-      cluster.init(conf);
-      cluster.start();
-      final Configuration yarnConf = cluster.getConfig();
-
-      // the client has to connect to AMRMProxy
-
-      yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
-          YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
-      rmClient = YarnClient.createYarnClient();
-      rmClient.init(yarnConf);
-      rmClient.start();
-
-      // Submit application
-
-      ApplicationId appId = createApp(rmClient, cluster);
-
-      client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
-
-      LOG.info("testDistributedSchedulingE2E - Register");
-
-      RegisterApplicationMasterResponse responseRegister =
-          client.registerApplicationMaster(RegisterApplicationMasterRequest
-              .newInstance(NetUtils.getHostname(), 1024, ""));
-
-      Assert.assertNotNull(responseRegister);
-      Assert.assertNotNull(responseRegister.getQueue());
-      Assert.assertNotNull(responseRegister.getApplicationACLs());
-      Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
-      Assert
-          .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
-      Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
-      Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
-
-      RMApp rmApp =
-          cluster.getResourceManager().getRMContext().getRMApps().get(appId);
-      Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
-
-      LOG.info("testDistributedSchedulingE2E - Allocate");
-
-      AllocateRequest request =
-          createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
-
-      // Replace 'ANY' requests with OPPORTUNISTIC aks and remove
-      // everything else
-      List<ResourceRequest> newAskList = new ArrayList<>();
-      for (ResourceRequest rr : request.getAskList()) {
-        if (ResourceRequest.ANY.equals(rr.getResourceName())) {
-          ResourceRequest newRR = ResourceRequest.newInstance(rr
-                  .getPriority(), rr.getResourceName(),
-              rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
-              rr.getNodeLabelExpression(),
-              ExecutionTypeRequest.newInstance(
-                  ExecutionType.OPPORTUNISTIC, true));
-          newAskList.add(newRR);
-        }
-      }
-      request.setAskList(newAskList);
-
-      AllocateResponse allocResponse = client.allocate(request);
-      Assert.assertNotNull(allocResponse);
-
-      // Ensure that all the requests are satisfied immediately
-      Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
-      // Verify that the allocated containers are OPPORTUNISTIC
-      for (Container allocatedContainer : allocResponse
-          .getAllocatedContainers()) {
-        ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
-            .newContainerTokenIdentifier(
-                allocatedContainer.getContainerToken());
-        Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
-            containerTokenIdentifier.getExecutionType());
-      }
-
-      LOG.info("testDistributedSchedulingE2E - Finish");
-
-      FinishApplicationMasterResponse responseFinish =
-          client.finishApplicationMaster(FinishApplicationMasterRequest
-              .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
-
-      Assert.assertNotNull(responseFinish);
-
-    } finally {
-      if (rmClient != null) {
-        rmClient.stop();
+    LOG.info("testDistributedSchedulingE2E - Register");
+
+    RegisterApplicationMasterResponse responseRegister =
+        client.registerApplicationMaster(RegisterApplicationMasterRequest
+            .newInstance(NetUtils.getHostname(), 1024, ""));
+
+    Assert.assertNotNull(responseRegister);
+    Assert.assertNotNull(responseRegister.getQueue());
+    Assert.assertNotNull(responseRegister.getApplicationACLs());
+    Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
+    Assert
+        .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
+    Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
+    Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
+
+    RMApp rmApp =
+        cluster.getResourceManager().getRMContext().getRMApps().get(appId);
+    Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+
+    LOG.info("testDistributedSchedulingE2E - Allocate");
+
+    AllocateRequest request =
+        createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
+
+    // Replace 'ANY' requests with OPPORTUNISTIC aks and remove
+    // everything else
+    List<ResourceRequest> newAskList = new ArrayList<>();
+    for (ResourceRequest rr : request.getAskList()) {
+      if (ResourceRequest.ANY.equals(rr.getResourceName())) {
+        ResourceRequest newRR = ResourceRequest.newInstance(rr
+                .getPriority(), rr.getResourceName(),
+            rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
+            rr.getNodeLabelExpression(),
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true));
+        newAskList.add(newRR);
       }
-      cluster.stop();
     }
+    request.setAskList(newAskList);
+
+    AllocateResponse allocResponse = client.allocate(request);
+    Assert.assertNotNull(allocResponse);
+
+    // Ensure that all the requests are satisfied immediately
+    Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+    // Verify that the allocated containers are OPPORTUNISTIC
+    for (Container allocatedContainer : allocResponse
+        .getAllocatedContainers()) {
+      ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+          .newContainerTokenIdentifier(
+              allocatedContainer.getContainerToken());
+      Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+          containerTokenIdentifier.getExecutionType());
+    }
+
+    LOG.info("testDistributedSchedulingE2E - Finish");
   }
 
   /**
@@ -178,135 +218,305 @@ public class TestDistributedScheduling extends TestAMRMProxy {
    */
   @Test(timeout = 60000)
   public void testMixedExecutionTypeRequestE2E() throws Exception {
-    MiniYARNCluster cluster =
-        new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
-    YarnClient rmClient = null;
-    ApplicationMasterProtocol client;
+    LOG.info("testDistributedSchedulingE2E - Register");
+
+    RegisterApplicationMasterResponse responseRegister =
+        client.registerApplicationMaster(RegisterApplicationMasterRequest
+            .newInstance(NetUtils.getHostname(), 1024, ""));
+
+    Assert.assertNotNull(responseRegister);
+    Assert.assertNotNull(responseRegister.getQueue());
+    Assert.assertNotNull(responseRegister.getApplicationACLs());
+    Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
+    Assert
+        .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
+    Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
+    Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
+
+    RMApp rmApp =
+        cluster.getResourceManager().getRMContext().getRMApps().get(appId);
+    Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+
+    LOG.info("testDistributedSchedulingE2E - Allocate");
+
+    AllocateRequest request =
+        createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
+    List<ResourceRequest> askList = request.getAskList();
+    List<ResourceRequest> newAskList = new ArrayList<>(askList);
+
+    // Duplicate all ANY requests marking them as opportunistic
+    for (ResourceRequest rr : askList) {
+      if (ResourceRequest.ANY.equals(rr.getResourceName())) {
+        ResourceRequest newRR = ResourceRequest.newInstance(rr
+                .getPriority(), rr.getResourceName(),
+            rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
+            rr.getNodeLabelExpression(),
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true));
+        newAskList.add(newRR);
+      }
+    }
+    request.setAskList(newAskList);
+
+    AllocateResponse allocResponse = client.allocate(request);
+    Assert.assertNotNull(allocResponse);
+
+    // Ensure that all the requests are satisfied immediately
+    Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+    // Verify that the allocated containers are OPPORTUNISTIC
+    for (Container allocatedContainer : allocResponse
+        .getAllocatedContainers()) {
+      ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+          .newContainerTokenIdentifier(
+              allocatedContainer.getContainerToken());
+      Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+          containerTokenIdentifier.getExecutionType());
+    }
+
+    request.setAskList(new ArrayList<ResourceRequest>());
+    request.setResponseId(request.getResponseId() + 1);
 
+    Thread.sleep(1000);
+
+    // RM should allocate GUARANTEED containers within 2 calls to allocate()
+    allocResponse = client.allocate(request);
+    Assert.assertNotNull(allocResponse);
+    Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+
+    // Verify that the allocated containers are GUARANTEED
+    for (Container allocatedContainer : allocResponse
+        .getAllocatedContainers()) {
+      ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+          .newContainerTokenIdentifier(
+              allocatedContainer.getContainerToken());
+      Assert.assertEquals(ExecutionType.GUARANTEED,
+          containerTokenIdentifier.getExecutionType());
+    }
+
+    LOG.info("testDistributedSchedulingE2E - Finish");
+  }
+
+  /**
+   * Validates if AMRMClient can be used with Distributed Scheduling turned on.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 120000)
+  @SuppressWarnings("unchecked")
+  public void testAMRMClient() throws Exception {
+    AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null;
     try {
-      Configuration conf = new YarnConfiguration();
-      conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
-      conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
-      conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
-      cluster.init(conf);
-      cluster.start();
-      final Configuration yarnConf = cluster.getConfig();
-
-      // the client has to connect to AMRMProxy
-
-      yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
-          YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
-      rmClient = YarnClient.createYarnClient();
-      rmClient.init(yarnConf);
-      rmClient.start();
-
-      // Submit application
-
-      ApplicationId appId = createApp(rmClient, cluster);
-
-      client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
-
-      LOG.info("testDistributedSchedulingE2E - Register");
-
-      RegisterApplicationMasterResponse responseRegister =
-          client.registerApplicationMaster(RegisterApplicationMasterRequest
-              .newInstance(NetUtils.getHostname(), 1024, ""));
-
-      Assert.assertNotNull(responseRegister);
-      Assert.assertNotNull(responseRegister.getQueue());
-      Assert.assertNotNull(responseRegister.getApplicationACLs());
-      Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
-      Assert
-          .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
-      Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
-      Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
-
-      RMApp rmApp =
-          cluster.getResourceManager().getRMContext().getRMApps().get(appId);
-      Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
-
-      LOG.info("testDistributedSchedulingE2E - Allocate");
-
-      AllocateRequest request =
-          createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
-      List<ResourceRequest> askList = request.getAskList();
-      List<ResourceRequest> newAskList = new ArrayList<>(askList);
-
-      // Duplicate all ANY requests marking them as opportunistic
-      for (ResourceRequest rr : askList) {
-        if (ResourceRequest.ANY.equals(rr.getResourceName())) {
-          ResourceRequest newRR = ResourceRequest.newInstance(rr
-              .getPriority(), rr.getResourceName(),
-              rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
-              rr.getNodeLabelExpression(),
+      Priority priority = Priority.newInstance(1);
+      Priority priority2 = Priority.newInstance(2);
+      Resource capability = Resource.newInstance(1024, 1);
+
+      List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
+      String node = nodeReports.get(0).getNodeId().getHost();
+      String rack = nodeReports.get(0).getRackName();
+      String[] nodes = new String[]{node};
+      String[] racks = new String[]{rack};
+
+      // start am rm client
+      amClient = new AMRMClientImpl(client);
+      amClient.init(yarnConf);
+      amClient.start();
+      amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
+
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      amClient.addContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.addContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.addContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.addContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.addContainerRequest(
+          new AMRMClient.ContainerRequest(capability, null, null, priority2,
+              true, null,
+              ExecutionTypeRequest.newInstance(
+                  ExecutionType.OPPORTUNISTIC, true)));
+      amClient.addContainerRequest(
+          new AMRMClient.ContainerRequest(capability, null, null, priority2,
+              true, null,
+              ExecutionTypeRequest.newInstance(
+                  ExecutionType.OPPORTUNISTIC, true)));
+
+      amClient.removeContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.removeContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.removeContainerRequest(
+          new AMRMClient.ContainerRequest(capability, null, null, priority2,
+              true, null,
               ExecutionTypeRequest.newInstance(
-                  ExecutionType.OPPORTUNISTIC, true));
-          newAskList.add(newRR);
+                  ExecutionType.OPPORTUNISTIC, true)));
+
+      int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
+          node, ExecutionType.GUARANTEED, capability).remoteRequest
+          .getNumContainers();
+      int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
+          rack, ExecutionType.GUARANTEED, capability).remoteRequest
+          .getNumContainers();
+      int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
+          ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+          .remoteRequest.getNumContainers();
+      int oppContainersRequestedAny =
+          amClient.remoteRequestsTable.get(priority2, ResourceRequest.ANY,
+              ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+              .getNumContainers();
+
+      assertEquals(2, containersRequestedNode);
+      assertEquals(2, containersRequestedRack);
+      assertEquals(2, containersRequestedAny);
+      assertEquals(1, oppContainersRequestedAny);
+
+      assertEquals(4, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      // RM should allocate container within 2 calls to allocate()
+      int allocatedContainerCount = 0;
+      int iterationsLeft = 10;
+      Set<ContainerId> releases = new TreeSet<>();
+
+      amClient.getNMTokenCache().clearCache();
+      Assert.assertEquals(0,
+          amClient.getNMTokenCache().numberOfTokensInCache());
+      HashMap<String, Token> receivedNMTokens = new HashMap<>();
+
+      while (allocatedContainerCount <
+          (containersRequestedAny + oppContainersRequestedAny)
+          && iterationsLeft-- > 0) {
+        AllocateResponse allocResponse = amClient.allocate(0.1f);
+        assertEquals(0, amClient.ask.size());
+        assertEquals(0, amClient.release.size());
+
+        allocatedContainerCount += allocResponse.getAllocatedContainers()
+            .size();
+        for (Container container : allocResponse.getAllocatedContainers()) {
+          ContainerId rejectContainerId = container.getId();
+          releases.add(rejectContainerId);
         }
-      }
-      request.setAskList(newAskList);
-
-      AllocateResponse allocResponse = client.allocate(request);
-      Assert.assertNotNull(allocResponse);
-
-      // Ensure that all the requests are satisfied immediately
-      Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
-      // Verify that the allocated containers are OPPORTUNISTIC
-      for (Container allocatedContainer : allocResponse
-          .getAllocatedContainers()) {
-        ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
-            .newContainerTokenIdentifier(
-            allocatedContainer.getContainerToken());
-        Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
-            containerTokenIdentifier.getExecutionType());
-      }
-
-      request.setAskList(new ArrayList<ResourceRequest>());
-      request.setResponseId(request.getResponseId() + 1);
-
-      Thread.sleep(1000);
 
-      // RM should allocate GUARANTEED containers within 2 calls to allocate()
-      allocResponse = client.allocate(request);
-      Assert.assertNotNull(allocResponse);
-      Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
+        for (NMToken token : allocResponse.getNMTokens()) {
+          String nodeID = token.getNodeId().toString();
+          receivedNMTokens.put(nodeID, token.getToken());
+        }
 
-      // Verify that the allocated containers are GUARANTEED
-      for (Container allocatedContainer : allocResponse
-          .getAllocatedContainers()) {
-        ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
-            .newContainerTokenIdentifier(
-                allocatedContainer.getContainerToken());
-        Assert.assertEquals(ExecutionType.GUARANTEED,
-            containerTokenIdentifier.getExecutionType());
+        if (allocatedContainerCount < containersRequestedAny) {
+          // sleep to let NM's heartbeat to RM and trigger allocations
+          sleep(100);
+        }
       }
 
-      LOG.info("testDistributedSchedulingE2E - Finish");
+      assertEquals(allocatedContainerCount,
+          containersRequestedAny + oppContainersRequestedAny);
+      for (ContainerId rejectContainerId : releases) {
+        amClient.releaseAssignedContainer(rejectContainerId);
+      }
+      assertEquals(3, amClient.release.size());
+      assertEquals(0, amClient.ask.size());
+
+      // need to tell the AMRMClient that we dont need these resources anymore
+      amClient.removeContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.removeContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+      amClient.removeContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
+              true, null,
+              ExecutionTypeRequest.newInstance(
+                  ExecutionType.OPPORTUNISTIC, true)));
+      assertEquals(4, amClient.ask.size());
+
+      // test RPC exception handling
+      amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
+          nodes, racks, priority));
+      amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
+          nodes, racks, priority));
+      amClient.addContainerRequest(
+          new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
+              true, null,
+              ExecutionTypeRequest.newInstance(
+                  ExecutionType.OPPORTUNISTIC, true)));
+
+      final AMRMClient amc = amClient;
+      ApplicationMasterProtocol realRM = amClient.rmClient;
+      try {
+        ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol
+            .class);
+        when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
+            new Answer<AllocateResponse>() {
+              public AllocateResponse answer(InvocationOnMock invocation)
+                  throws Exception {
+                amc.removeContainerRequest(
+                    new AMRMClient.ContainerRequest(capability, nodes,
+                        racks, priority));
+                amc.removeContainerRequest(
+                    new AMRMClient.ContainerRequest(capability, nodes, racks,
+                        priority));
+                amc.removeContainerRequest(
+                    new AMRMClient.ContainerRequest(capability, null, null,
+                        priority2, true, null,
+                        ExecutionTypeRequest.newInstance(
+                            ExecutionType.OPPORTUNISTIC, true)));
+                throw new Exception();
+              }
+            });
+        amClient.rmClient = mockRM;
+        amClient.allocate(0.1f);
+      } catch (Exception ioe) {
+      } finally {
+        amClient.rmClient = realRM;
+      }
 
-      FinishApplicationMasterResponse responseFinish =
-          client.finishApplicationMaster(FinishApplicationMasterRequest
-              .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
+      assertEquals(3, amClient.release.size());
+      assertEquals(6, amClient.ask.size());
+
+      iterationsLeft = 3;
+      // do a few iterations to ensure RM is not going send new containers
+      while (iterationsLeft-- > 0) {
+        // inform RM of rejection
+        AllocateResponse allocResponse = amClient.allocate(0.1f);
+        // RM did not send new containers because AM does not need any
+        assertEquals(0, allocResponse.getAllocatedContainers().size());
+        if (allocResponse.getCompletedContainersStatuses().size() > 0) {
+          for (ContainerStatus cStatus : allocResponse
+              .getCompletedContainersStatuses()) {
+            if (releases.contains(cStatus.getContainerId())) {
+              assertEquals(cStatus.getState(), ContainerState.COMPLETE);
+              assertEquals(-100, cStatus.getExitStatus());
+              releases.remove(cStatus.getContainerId());
+            }
+          }
+        }
+        if (iterationsLeft > 0) {
+          // sleep to make sure NM's heartbeat
+          sleep(100);
+        }
+      }
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
 
-      Assert.assertNotNull(responseFinish);
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
 
     } finally {
-      if (rmClient != null) {
-        rmClient.stop();
+      if (amClient != null && amClient.getServiceState() == Service.STATE
+          .STARTED) {
+        amClient.stop();
       }
-      cluster.stop();
     }
   }
 
-  @Ignore
-  @Override
-  public void testAMRMProxyE2E() throws Exception { }
-
-  @Ignore
-  @Override
-  public void testE2ETokenRenewal() throws Exception { }
-
-  @Ignore
-  @Override
-  public void testE2ETokenSwap() throws Exception { }
+  private void sleep(int sleepTime) {
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
 }

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -251,9 +252,9 @@ public class TestNMClient {
           racks, priority));
     }
 
-    int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
-        .get(ResourceRequest.ANY).get(capability).remoteRequest
-        .getNumContainers();
+    int containersRequestedAny = rmClient.remoteRequestsTable.get(priority,
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+        .remoteRequest.getNumContainers();
 
     // RM should allocate container within 2 calls to allocate()
     int allocatedContainerCount = 0;

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java

@@ -214,7 +214,8 @@ public class ResourceRequestPBImpl extends  ResourceRequest {
         + ", # Containers: " + getNumContainers()
         + ", Location: " + getResourceName()
         + ", Relax Locality: " + getRelaxLocality()
-        + ", Execution Spec: " + getExecutionTypeRequest() + "}";
+        + ", Execution Type Request: " + getExecutionTypeRequest()
+        + ", Node Label Expression: " + getNodeLabelExpression() + "}";
   }
 
   @Override
@@ -235,4 +236,4 @@ public class ResourceRequestPBImpl extends  ResourceRequest {
     }
     builder.setNodeLabelExpression(nodeLabelExpression);
   }
-}
+}