瀏覽代碼

YARN-6050. AMs can't be scheduled on racks or nodes (rkanter)

Robert Kanter 8 年之前
父節點
當前提交
a65011a128
共有 32 個文件被更改,包括 1212 次插入140 次删除
  1. 3 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
  2. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
  3. 46 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
  4. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
  5. 49 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
  6. 22 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
  7. 77 27
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
  8. 59 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
  9. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
  10. 11 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  11. 19 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
  12. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  13. 63 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
  14. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
  15. 6 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
  16. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
  17. 40 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  18. 192 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
  19. 5 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
  20. 184 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java
  21. 297 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMServerUtils.java
  22. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
  23. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
  24. 6 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
  25. 6 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
  26. 10 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
  27. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
  28. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
  29. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
  30. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
  31. 78 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
  32. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -589,7 +590,8 @@ public class YARNRunner implements ClientProtocol {
       amResourceRequest.setCapability(capability);
       amResourceRequest.setNumContainers(1);
       amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
-      appContext.setAMContainerResourceRequest(amResourceRequest);
+      appContext.setAMContainerResourceRequests(
+          Collections.singletonList(amResourceRequest));
     }
     // set labels for the Job containers
     appContext.setNodeLabelExpression(jobConf

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java

@@ -571,7 +571,7 @@ public class TestYARNRunner {
         buildSubmitContext(yarnRunner, jobConf);
 
     assertEquals(appSubCtx.getNodeLabelExpression(), "GPU");
-    assertEquals(appSubCtx.getAMContainerResourceRequest()
+    assertEquals(appSubCtx.getAMContainerResourceRequests().get(0)
         .getNodeLabelExpression(), "highMem");
   }
 

+ 46 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.api.records;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -100,7 +102,7 @@ public abstract class ApplicationSubmissionContext {
     amReq.setNumContainers(1);
     amReq.setRelaxLocality(true);
     amReq.setNodeLabelExpression(amContainerLabelExpression);
-    context.setAMContainerResourceRequest(amReq);
+    context.setAMContainerResourceRequests(Collections.singletonList(amReq));
     return context;
   }
   
@@ -159,7 +161,8 @@ public abstract class ApplicationSubmissionContext {
     context.setApplicationType(applicationType);
     context.setKeepContainersAcrossApplicationAttempts(keepContainers);
     context.setNodeLabelExpression(appLabelExpression);
-    context.setAMContainerResourceRequest(resourceRequest);
+    context.setAMContainerResourceRequests(
+        Collections.singletonList(resourceRequest));
     return context;
   }
 
@@ -454,29 +457,61 @@ public abstract class ApplicationSubmissionContext {
   public abstract void setNodeLabelExpression(String nodeLabelExpression);
   
   /**
-   * Get ResourceRequest of AM container, if this is not null, scheduler will
-   * use this to acquire resource for AM container.
-   * 
+   * Get the ResourceRequest of the AM container.
+   *
+   * If this is not null, scheduler will use this to acquire resource for AM
+   * container.
+   *
    * If this is null, scheduler will assemble a ResourceRequest by using
    * <em>getResource</em> and <em>getPriority</em> of
    * <em>ApplicationSubmissionContext</em>.
-   * 
-   * Number of containers and Priority will be ignore.
-   * 
-   * @return ResourceRequest of AM container
+   *
+   * Number of containers and Priority will be ignored.
+   *
+   * @return ResourceRequest of the AM container
+   * @deprecated See {@link #getAMContainerResourceRequests()}
    */
   @Public
   @Evolving
+  @Deprecated
   public abstract ResourceRequest getAMContainerResourceRequest();
   
   /**
-   * Set ResourceRequest of AM container
-   * @param request of AM container
+   * Set ResourceRequest of the AM container
+   * @param request of the AM container
+   * @deprecated See {@link #setAMContainerResourceRequests(List)}
    */
   @Public
   @Evolving
+  @Deprecated
   public abstract void setAMContainerResourceRequest(ResourceRequest request);
 
+  /**
+   * Get the ResourceRequests of the AM container.
+   *
+   * If this is not null, scheduler will use this to acquire resource for AM
+   * container.
+   *
+   * If this is null, scheduler will use the ResourceRequest as determined by
+   * <em>getAMContainerResourceRequest</em> and its behavior.
+   *
+   * Number of containers and Priority will be ignored.
+   *
+   * @return List of ResourceRequests of the AM container
+   */
+  @Public
+  @Evolving
+  public abstract List<ResourceRequest> getAMContainerResourceRequests();
+
+  /**
+   * Set ResourceRequests of the AM container.
+   * @param requests of the AM container
+   */
+  @Public
+  @Evolving
+  public abstract void setAMContainerResourceRequests(
+      List<ResourceRequest> requests);
+
   /**
    * Get the attemptFailuresValidityInterval in milliseconds for the application
    *

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -383,7 +383,7 @@ message ApplicationSubmissionContextProto {
   optional LogAggregationContextProto log_aggregation_context = 14;
   optional ReservationIdProto reservation_id = 15;
   optional string node_label_expression = 16;
-  optional ResourceRequestProto am_container_resource_request = 17;
+  repeated ResourceRequestProto am_container_resource_request = 17;
   repeated ApplicationTimeoutMapProto application_timeouts = 18;
 }
 

+ 49 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.api.records.impl.pb;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -66,7 +68,7 @@ extends ApplicationSubmissionContext {
   private ContainerLaunchContext amContainer = null;
   private Resource resource = null;
   private Set<String> applicationTags = null;
-  private ResourceRequest amResourceRequest = null;
+  private List<ResourceRequest> amResourceRequests = null;
   private LogAggregationContext logAggregationContext = null;
   private ReservationId reservationId = null;
   private Map<ApplicationTimeoutType, Long> applicationTimeouts = null;
@@ -127,9 +129,10 @@ extends ApplicationSubmissionContext {
       builder.clearApplicationTags();
       builder.addAllApplicationTags(this.applicationTags);
     }
-    if (this.amResourceRequest != null) {
-      builder.setAmContainerResourceRequest(
-          convertToProtoFormat(this.amResourceRequest));
+    if (this.amResourceRequests != null) {
+      builder.clearAmContainerResourceRequest();
+      builder.addAllAmContainerResourceRequest(
+          convertToProtoFormat(this.amResourceRequests));
     }
     if (this.logAggregationContext != null) {
       builder.setLogAggregationContext(
@@ -430,13 +433,23 @@ extends ApplicationSubmissionContext {
   private PriorityProto convertToProtoFormat(Priority t) {
     return ((PriorityPBImpl)t).getProto();
   }
-  
-  private ResourceRequestPBImpl convertFromProtoFormat(ResourceRequestProto p) {
-    return new ResourceRequestPBImpl(p);
+
+  private List<ResourceRequest> convertFromProtoFormat(
+      List<ResourceRequestProto> ps) {
+    List<ResourceRequest> rs = new ArrayList<>();
+    for (ResourceRequestProto p : ps) {
+      rs.add(new ResourceRequestPBImpl(p));
+    }
+    return rs;
   }
 
-  private ResourceRequestProto convertToProtoFormat(ResourceRequest t) {
-    return ((ResourceRequestPBImpl)t).getProto();
+  private List<ResourceRequestProto> convertToProtoFormat(
+      List<ResourceRequest> ts) {
+    List<ResourceRequestProto> rs = new ArrayList<>(ts.size());
+    for (ResourceRequest t : ts) {
+      rs.add(((ResourceRequestPBImpl)t).getProto());
+    }
+    return rs;
   }
 
   private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
@@ -485,25 +498,46 @@ extends ApplicationSubmissionContext {
   }
   
   @Override
+  @Deprecated
   public ResourceRequest getAMContainerResourceRequest() {
+    List<ResourceRequest> reqs = getAMContainerResourceRequests();
+    if (reqs == null || reqs.isEmpty()) {
+      return null;
+    }
+    return getAMContainerResourceRequests().get(0);
+  }
+
+  @Override
+  public List<ResourceRequest> getAMContainerResourceRequests() {
     ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
-    if (this.amResourceRequest != null) {
-      return amResourceRequest;
+    if (this.amResourceRequests != null) {
+      return amResourceRequests;
     } // Else via proto
-    if (!p.hasAmContainerResourceRequest()) {
+    if (p.getAmContainerResourceRequestCount() == 0) {
       return null;
     }
-    amResourceRequest = convertFromProtoFormat(p.getAmContainerResourceRequest());
-    return amResourceRequest;
+    amResourceRequests =
+        convertFromProtoFormat(p.getAmContainerResourceRequestList());
+    return amResourceRequests;
   }
 
   @Override
+  @Deprecated
   public void setAMContainerResourceRequest(ResourceRequest request) {
     maybeInitBuilder();
     if (request == null) {
       builder.clearAmContainerResourceRequest();
     }
-    this.amResourceRequest = request;
+    this.amResourceRequests = Collections.singletonList(request);
+  }
+
+  @Override
+  public void setAMContainerResourceRequests(List<ResourceRequest> requests) {
+    maybeInitBuilder();
+    if (requests == null) {
+      builder.clearAmContainerResourceRequest();
+    }
+    this.amResourceRequests = requests;
   }
 
   @Override

+ 22 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java

@@ -801,6 +801,28 @@ public class CommonNodeLabelsManager extends AbstractService {
     }
   }
 
+  /**
+   * Get nodes that have no labels.
+   *
+   * @return set of nodes with no labels
+   */
+  public Set<NodeId> getNodesWithoutALabel() {
+    try {
+      readLock.lock();
+      Set<NodeId> nodes = new HashSet<>();
+      for (Host host : nodeCollections.values()) {
+        for (NodeId nodeId : host.nms.keySet()) {
+          if (getLabelsByNode(nodeId).isEmpty()) {
+            nodes.add(nodeId);
+          }
+        }
+      }
+      return Collections.unmodifiableSet(nodes);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
 
   /**
    * Get mapping of labels to nodes for all the labels.

+ 77 - 27
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.util.Collections;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -31,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -335,14 +339,16 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     // has been disabled. Reject the recovery of this application if it
     // is true and give clear message so that user can react properly.
     if (!appContext.getUnmanagedAM() &&
-        application.getAMResourceRequest() == null &&
+        (application.getAMResourceRequests() == null ||
+            application.getAMResourceRequests().isEmpty()) &&
         !YarnConfiguration.areNodeLabelsEnabled(this.conf)) {
       // check application submission context and see if am resource request
       // or application itself contains any node label expression.
-      ResourceRequest amReqFromAppContext =
-          appContext.getAMContainerResourceRequest();
-      String labelExp = (amReqFromAppContext != null) ?
-          amReqFromAppContext.getNodeLabelExpression() : null;
+      List<ResourceRequest> amReqsFromAppContext =
+          appContext.getAMContainerResourceRequests();
+      String labelExp =
+          (amReqsFromAppContext != null && !amReqsFromAppContext.isEmpty()) ?
+          amReqsFromAppContext.get(0).getNodeLabelExpression() : null;
       if (labelExp == null) {
         labelExp = appContext.getNodeLabelExpression();
       }
@@ -377,9 +383,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     }
 
     ApplicationId applicationId = submissionContext.getApplicationId();
-    ResourceRequest amReq = null;
+    List<ResourceRequest> amReqs = null;
     try {
-      amReq = validateAndCreateResourceRequest(submissionContext, isRecovery);
+      amReqs = validateAndCreateResourceRequest(submissionContext, isRecovery);
     } catch (InvalidLabelResourceRequestException e) {
       // This can happen if the application had been submitted and run
       // with Node Label enabled but recover with Node Label disabled.
@@ -440,7 +446,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
         submissionContext.getApplicationName(), user,
         submissionContext.getQueue(), submissionContext, this.scheduler,
         this.masterService, submitTime, submissionContext.getApplicationType(),
-        submissionContext.getApplicationTags(), amReq);
+        submissionContext.getApplicationTags(), amReqs);
 
     // Concurrent app submissions with same applicationId will fail here
     // Concurrent app submissions with different applicationIds will not
@@ -462,7 +468,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     return application;
   }
 
-  private ResourceRequest validateAndCreateResourceRequest(
+  private List<ResourceRequest> validateAndCreateResourceRequest(
       ApplicationSubmissionContext submissionContext, boolean isRecovery)
       throws InvalidResourceRequestException {
     // Validation of the ApplicationSubmissionContext needs to be completed
@@ -472,33 +478,77 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
 
     // Check whether AM resource requirements are within required limits
     if (!submissionContext.getUnmanagedAM()) {
-      ResourceRequest amReq = submissionContext.getAMContainerResourceRequest();
-      if (amReq == null) {
-        amReq = BuilderUtils
-            .newResourceRequest(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
-                ResourceRequest.ANY, submissionContext.getResource(), 1);
-      }
-
-      // set label expression for AM container
-      if (null == amReq.getNodeLabelExpression()) {
-        amReq.setNodeLabelExpression(submissionContext
-            .getNodeLabelExpression());
+      List<ResourceRequest> amReqs =
+          submissionContext.getAMContainerResourceRequests();
+      if (amReqs == null || amReqs.isEmpty()) {
+        if (submissionContext.getResource() != null) {
+          amReqs = Collections.singletonList(BuilderUtils
+              .newResourceRequest(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
+                  ResourceRequest.ANY, submissionContext.getResource(), 1));
+        } else {
+          throw new InvalidResourceRequestException("Invalid resource request, "
+              + "no resources requested");
+        }
       }
 
       try {
-        SchedulerUtils.normalizeAndValidateRequest(amReq,
-            scheduler.getMaximumResourceCapability(),
-            submissionContext.getQueue(), scheduler, isRecovery, rmContext);
+        // Find the ANY request and ensure there's only one
+        ResourceRequest anyReq = null;
+        for (ResourceRequest amReq : amReqs) {
+          if (amReq.getResourceName().equals(ResourceRequest.ANY)) {
+            if (anyReq == null) {
+              anyReq = amReq;
+            } else {
+              throw new InvalidResourceRequestException("Invalid resource "
+                  + "request, only one resource request with "
+                  + ResourceRequest.ANY + " is allowed");
+            }
+          }
+        }
+        if (anyReq == null) {
+          throw new InvalidResourceRequestException("Invalid resource request, "
+              + "no resource request specified with " + ResourceRequest.ANY);
+        }
+
+        // Make sure that all of the requests agree with the ANY request
+        // and have correct values
+        for (ResourceRequest amReq : amReqs) {
+          amReq.setCapability(anyReq.getCapability());
+          amReq.setExecutionTypeRequest(
+              ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
+          amReq.setNumContainers(1);
+          amReq.setPriority(RMAppAttemptImpl.AM_CONTAINER_PRIORITY);
+        }
+
+        // set label expression for AM ANY request if not set
+        if (null == anyReq.getNodeLabelExpression()) {
+          anyReq.setNodeLabelExpression(submissionContext
+              .getNodeLabelExpression());
+        }
+
+        // Put ANY request at the front
+        if (!amReqs.get(0).equals(anyReq)) {
+          amReqs.remove(anyReq);
+          amReqs.add(0, anyReq);
+        }
+
+        // Normalize all requests
+        for (ResourceRequest amReq : amReqs) {
+          SchedulerUtils.normalizeAndValidateRequest(amReq,
+              scheduler.getMaximumResourceCapability(),
+              submissionContext.getQueue(), scheduler, isRecovery, rmContext);
+
+          amReq.setCapability(
+              scheduler.getNormalizedResource(amReq.getCapability()));
+        }
+        return amReqs;
       } catch (InvalidResourceRequestException e) {
         LOG.warn("RM app submission failed in validating AM resource request"
             + " for application " + submissionContext.getApplicationId(), e);
         throw e;
       }
-
-      amReq.setCapability(scheduler.getNormalizedResource(amReq.getCapability()));
-      return amReq;
     }
-    
+
     return null;
   }
 

+ 59 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java

@@ -21,13 +21,16 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.io.IOException;
 import java.text.ParseException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.AccessControlException;
@@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -564,19 +568,65 @@ public class RMServerUtils {
    *
    * @param rmContext context
    * @param conf configuration
-   * @param amreq am resource request
+   * @param amReqs am resource requests
    * @return applicable node count
    */
   public static int getApplicableNodeCountForAM(RMContext rmContext,
-      Configuration conf, ResourceRequest amreq) {
+      Configuration conf, List<ResourceRequest> amReqs) {
+    // Determine the list of nodes that are eligible based on the strict
+    // resource requests
+    Set<NodeId> nodesForReqs = new HashSet<>();
+    for (ResourceRequest amReq : amReqs) {
+      if (amReq.getRelaxLocality() &&
+          !amReq.getResourceName().equals(ResourceRequest.ANY)) {
+        nodesForReqs.addAll(
+            rmContext.getScheduler().getNodeIds(amReq.getResourceName()));
+      }
+    }
+
     if (YarnConfiguration.areNodeLabelsEnabled(conf)) {
-      RMNodeLabelsManager labelManager = rmContext.getNodeLabelManager();
-      String amNodeLabelExpression = amreq.getNodeLabelExpression();
-      amNodeLabelExpression = (amNodeLabelExpression == null
-          || amNodeLabelExpression.trim().isEmpty())
-              ? RMNodeLabelsManager.NO_LABEL : amNodeLabelExpression;
-      return labelManager.getActiveNMCountPerLabel(amNodeLabelExpression);
+      // Determine the list of nodes that are eligible based on the node label
+      String amNodeLabelExpression = amReqs.get(0).getNodeLabelExpression();
+      Set<NodeId> nodesForLabels =
+          getNodeIdsForLabel(rmContext, amNodeLabelExpression);
+      if (nodesForLabels != null && !nodesForLabels.isEmpty()) {
+        // If only node labels, strip out any wildcard NodeIds and return
+        if (nodesForReqs.isEmpty()) {
+          for (Iterator<NodeId> it = nodesForLabels.iterator(); it.hasNext();) {
+            if (it.next().getPort() == 0) {
+              it.remove();
+            }
+          }
+          return nodesForLabels.size();
+        } else {
+          // The NodeIds common to both the strict resource requests and the
+          // node label is the eligible set
+          return Sets.intersection(nodesForReqs, nodesForLabels).size();
+        }
+      }
+    }
+
+    // If no strict resource request NodeIds nor node label NodeIds, then just
+    // return the entire cluster
+    if (nodesForReqs.isEmpty()) {
+      return rmContext.getScheduler().getNumClusterNodes();
+    }
+    // No node label NodeIds, so return the strict resource request NodeIds
+    return nodesForReqs.size();
+  }
+
+  private static Set<NodeId> getNodeIdsForLabel(RMContext rmContext,
+      String label) {
+    label = (label == null || label.trim().isEmpty())
+        ? RMNodeLabelsManager.NO_LABEL : label;
+    if (label.equals(RMNodeLabelsManager.NO_LABEL)) {
+      // NO_LABEL nodes aren't tracked directly
+      return rmContext.getNodeLabelManager().getNodesWithoutALabel();
+    } else {
+      Map<String, Set<NodeId>> labelsToNodes =
+          rmContext.getNodeLabelManager().getLabelsToNodes(
+              Collections.singleton(label));
+      return labelsToNodes.get(label);
     }
-    return rmContext.getScheduler().getNumClusterNodes();
   }
 }

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -246,7 +247,7 @@ public interface RMApp extends EventHandler<RMAppEvent> {
 
   ReservationId getReservationId();
   
-  ResourceRequest getAMResourceRequest();
+  List<ResourceRequest> getAMResourceRequests();
 
   Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp();
 

+ 11 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -194,7 +194,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private RMAppEvent eventCausingFinalSaving;
   private RMAppState targetedFinalState;
   private RMAppState recoveredFinalState;
-  private ResourceRequest amReq;
+  private List<ResourceRequest> amReqs;
   
   private CallerContext callerContext;
 
@@ -405,8 +405,8 @@ public class RMAppImpl implements RMApp, Recoverable {
       Configuration config, String name, String user, String queue,
       ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
       ApplicationMasterService masterService, long submitTime,
-      String applicationType, Set<String> applicationTags, 
-      ResourceRequest amReq) {
+      String applicationType, Set<String> applicationTags,
+      List<ResourceRequest> amReqs) {
 
     this.systemClock = SystemClock.getInstance();
 
@@ -425,7 +425,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.startTime = this.systemClock.getTime();
     this.applicationType = applicationType;
     this.applicationTags = applicationTags;
-    this.amReq = amReq;
+    this.amReqs = amReqs;
     if (submissionContext.getPriority() != null) {
       this.applicationPriority = Priority
           .newInstance(submissionContext.getPriority().getPriority());
@@ -919,7 +919,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       if (amBlacklistingEnabled && !submissionContext.getUnmanagedAM()) {
         currentAMBlacklistManager = new SimpleBlacklistManager(
             RMServerUtils.getApplicableNodeCountForAM(rmContext, conf,
-                getAMResourceRequest()),
+                getAMResourceRequests()),
             blacklistDisableThreshold);
       } else {
         currentAMBlacklistManager = new DisabledBlacklistManager();
@@ -927,7 +927,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
     RMAppAttempt attempt =
         new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
-          submissionContext, conf, amReq, this, currentAMBlacklistManager);
+          submissionContext, conf, amReqs, this, currentAMBlacklistManager);
     attempts.put(appAttemptId, attempt);
     currentAttempt = attempt;
   }
@@ -1605,8 +1605,8 @@ public class RMAppImpl implements RMApp, Recoverable {
   }
   
   @Override
-  public ResourceRequest getAMResourceRequest() {
-    return this.amReq; 
+  public List<ResourceRequest> getAMResourceRequests() {
+    return this.amReqs;
   }
 
   @Override
@@ -1879,7 +1879,9 @@ public class RMAppImpl implements RMApp, Recoverable {
   public String getAmNodeLabelExpression() {
     String amNodeLabelExpression = null;
     if (!getApplicationSubmissionContext().getUnmanagedAM()) {
-      amNodeLabelExpression = getAMResourceRequest().getNodeLabelExpression();
+      amNodeLabelExpression =
+          getAMResourceRequests() != null && !getAMResourceRequests().isEmpty()
+              ? getAMResourceRequests().get(0).getNodeLabelExpression() : null;
       amNodeLabelExpression = (amNodeLabelExpression == null)
           ? NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET : amNodeLabelExpression;
       amNodeLabelExpression = (amNodeLabelExpression.trim().isEmpty())

+ 19 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -192,7 +192,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
   private Object transitionTodo;
   
   private RMAppAttemptMetrics attemptMetrics = null;
-  private ResourceRequest amReq = null;
+  private List<ResourceRequest> amReqs = null;
   private BlacklistManager blacklistedNodesForAM = null;
 
   private String amLaunchDiagnostics;
@@ -485,16 +485,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       RMContext rmContext, YarnScheduler scheduler,
       ApplicationMasterService masterService,
       ApplicationSubmissionContext submissionContext,
-      Configuration conf, ResourceRequest amReq, RMApp rmApp) {
+      Configuration conf, List<ResourceRequest> amReqs, RMApp rmApp) {
     this(appAttemptId, rmContext, scheduler, masterService, submissionContext,
-        conf, amReq, rmApp, new DisabledBlacklistManager());
+        conf, amReqs, rmApp, new DisabledBlacklistManager());
   }
 
   public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
       RMContext rmContext, YarnScheduler scheduler,
       ApplicationMasterService masterService,
       ApplicationSubmissionContext submissionContext,
-      Configuration conf, ResourceRequest amReq, RMApp rmApp,
+      Configuration conf, List<ResourceRequest> amReqs, RMApp rmApp,
       BlacklistManager amBlacklistManager) {
     this.conf = conf;
     this.applicationAttemptId = appAttemptId;
@@ -514,7 +514,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     this.attemptMetrics =
         new RMAppAttemptMetrics(applicationAttemptId, rmContext);
 
-    this.amReq = amReq;
+    this.amReqs = amReqs;
     this.blacklistedNodesForAM = amBlacklistManager;
 
     final int diagnosticsLimitKC = getDiagnosticsLimitKCOrThrow(conf);
@@ -1092,18 +1092,21 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         // will be passed to scheduler, and scheduler will deduct the number after
         // AM container allocated
         
-        // Currently, following fields are all hard code,
+        // Currently, following fields are all hard coded,
         // TODO: change these fields when we want to support
-        // priority/resource-name/relax-locality specification for AM containers
-        // allocation.
-        appAttempt.amReq.setNumContainers(1);
-        appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
-        appAttempt.amReq.setResourceName(ResourceRequest.ANY);
-        appAttempt.amReq.setRelaxLocality(true);
-
-        appAttempt.getAMBlacklistManager().refreshNodeHostCount(
+        // priority or multiple containers AM container allocation.
+        for (ResourceRequest amReq : appAttempt.amReqs) {
+          amReq.setNumContainers(1);
+          amReq.setPriority(AM_CONTAINER_PRIORITY);
+        }
+
+        int numNodes =
             RMServerUtils.getApplicableNodeCountForAM(appAttempt.rmContext,
-                appAttempt.conf, appAttempt.amReq));
+                appAttempt.conf, appAttempt.amReqs);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting node count for blacklist to " + numNodes);
+        }
+        appAttempt.getAMBlacklistManager().refreshNodeHostCount(numNodes);
 
         ResourceBlacklistRequest amBlacklist =
             appAttempt.getAMBlacklistManager().getBlacklistUpdates();
@@ -1116,7 +1119,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         Allocation amContainerAllocation =
             appAttempt.scheduler.allocate(
                 appAttempt.applicationAttemptId,
-                Collections.singletonList(appAttempt.amReq),
+                appAttempt.amReqs,
                 EMPTY_CONTAINER_RELEASE_LIST,
                 amBlacklist.getBlacklistAdditions(),
                 amBlacklist.getBlacklistRemovals(),

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -1162,4 +1162,9 @@ public abstract class AbstractYarnScheduler
     return SchedulerUtils.createOpportunisticRmContainer(
         rmContext, demotedContainer, false);
   }
+
+  @Override
+  public List<NodeId> getNodeIds(String resourceName) {
+    return nodeTracker.getNodeIdsByResourceName(resourceName);
+  }
 }

+ 63 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java

@@ -268,6 +268,9 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
 
   /**
    * Convenience method to filter nodes based on a condition.
+   *
+   * @param nodeFilter A {@link NodeFilter} for filtering the nodes
+   * @return A list of filtered nodes
    */
   public List<N> getNodes(NodeFilter nodeFilter) {
     List<N> nodeList = new ArrayList<>();
@@ -288,6 +291,37 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     return nodeList;
   }
 
+  public List<NodeId> getAllNodeIds() {
+    return getNodeIds(null);
+  }
+
+  /**
+   * Convenience method to filter nodes based on a condition.
+   *
+   * @param nodeFilter A {@link NodeFilter} for filtering the nodes
+   * @return A list of filtered nodes
+   */
+  public List<NodeId> getNodeIds(NodeFilter nodeFilter) {
+    List<NodeId> nodeList = new ArrayList<>();
+    readLock.lock();
+    try {
+      if (nodeFilter == null) {
+        for (N node : nodes.values()) {
+          nodeList.add(node.getNodeID());
+        }
+      } else {
+        for (N node : nodes.values()) {
+          if (nodeFilter.accept(node)) {
+            nodeList.add(node.getNodeID());
+          }
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return nodeList;
+  }
+
   /**
    * Convenience method to sort nodes.
    *
@@ -320,11 +354,38 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
         resourceName != null && !resourceName.isEmpty());
     List<N> retNodes = new ArrayList<>();
     if (ResourceRequest.ANY.equals(resourceName)) {
-      return getAllNodes();
+      retNodes.addAll(getAllNodes());
     } else if (nodeNameToNodeMap.containsKey(resourceName)) {
       retNodes.add(nodeNameToNodeMap.get(resourceName));
     } else if (nodesPerRack.containsKey(resourceName)) {
-      return nodesPerRack.get(resourceName);
+      retNodes.addAll(nodesPerRack.get(resourceName));
+    } else {
+      LOG.info(
+          "Could not find a node matching given resourceName " + resourceName);
+    }
+    return retNodes;
+  }
+
+  /**
+   * Convenience method to return list of {@link NodeId} corresponding to
+   * resourceName passed in the {@link ResourceRequest}.
+   *
+   * @param resourceName Host/rack name of the resource, or
+   * {@link ResourceRequest#ANY}
+   * @return list of {@link NodeId} that match the resourceName
+   */
+  public List<NodeId> getNodeIdsByResourceName(final String resourceName) {
+    Preconditions.checkArgument(
+        resourceName != null && !resourceName.isEmpty());
+    List<NodeId> retNodes = new ArrayList<>();
+    if (ResourceRequest.ANY.equals(resourceName)) {
+      retNodes.addAll(getAllNodeIds());
+    } else if (nodeNameToNodeMap.containsKey(resourceName)) {
+      retNodes.add(nodeNameToNodeMap.get(resourceName).getNodeID());
+    } else if (nodesPerRack.containsKey(resourceName)) {
+      for (N node : nodesPerRack.get(resourceName)) {
+        retNodes.add(node.getNodeID());
+      }
     } else {
       LOG.info(
           "Could not find a node matching given resourceName " + resourceName);

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java

@@ -19,10 +19,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 
@@ -49,4 +51,11 @@ public interface ResourceScheduler extends YarnScheduler, Recoverable {
    * @throws IOException
    */
   void reinitialize(Configuration conf, RMContext rmContext) throws IOException;
+
+  /**
+   * Get the {@link NodeId} available in the cluster by resource name.
+   * @param resourceName resource name
+   * @return the number of available {@link NodeId} by resource name.
+   */
+  List<NodeId> getNodeIds(String resourceName);
 }

+ 6 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -139,18 +139,20 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     Resource amResource;
     String partition;
 
-    if (rmApp == null || rmApp.getAMResourceRequest() == null) {
+    if (rmApp == null || rmApp.getAMResourceRequests() == null
+        || rmApp.getAMResourceRequests().isEmpty()) {
       // the rmApp may be undefined (the resource manager checks for this too)
       // and unmanaged applications do not provide an amResource request
       // in these cases, provide a default using the scheduler
       amResource = rmContext.getScheduler().getMinimumResourceCapability();
       partition = CommonNodeLabelsManager.NO_LABEL;
     } else {
-      amResource = rmApp.getAMResourceRequest().getCapability();
+      amResource = rmApp.getAMResourceRequests().get(0).getCapability();
       partition =
-          (rmApp.getAMResourceRequest().getNodeLabelExpression() == null)
+          (rmApp.getAMResourceRequests().get(0)
+              .getNodeLabelExpression() == null)
           ? CommonNodeLabelsManager.NO_LABEL
-          : rmApp.getAMResourceRequest().getNodeLabelExpression();
+          : rmApp.getAMResourceRequests().get(0).getNodeLabelExpression();
     }
 
     setAppAMNodePartitionName(partition);

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java

@@ -229,7 +229,7 @@ public class AppInfo {
       appNodeLabelExpression =
           app.getApplicationSubmissionContext().getNodeLabelExpression();
       amNodeLabelExpression = (unmanagedApplication) ? null
-          : app.getAMResourceRequest().getNodeLabelExpression();
+          : app.getAMResourceRequests().get(0).getNodeLabelExpression();
 
       // Setting partition based resource usage of application
       ResourceScheduler scheduler = rm.getRMContext().getScheduler();

+ 40 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -24,6 +24,7 @@ import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -678,6 +679,17 @@ public class MockRM extends ResourceManager {
         tokensConf);
   }
 
+  public RMApp submitApp(List<ResourceRequest> amResourceRequests)
+      throws Exception {
+    return submitApp(amResourceRequests, "app1",
+        "user", null, false, null,
+        super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
+        false, false, null, 0, null, true,
+        amResourceRequests.get(0).getPriority(),
+        amResourceRequests.get(0).getNodeLabelExpression(), null, null);
+  }
+
   public RMApp submitApp(Resource capability, String name, String user,
       Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
       int maxAppAttempts, Credentials ts, String appType,
@@ -688,6 +700,30 @@ public class MockRM extends ResourceManager {
       Map<ApplicationTimeoutType, Long> applicationTimeouts,
       ByteBuffer tokensConf)
       throws Exception {
+    priority = (priority == null) ? Priority.newInstance(0) : priority;
+    ResourceRequest amResourceRequest = ResourceRequest.newInstance(
+        priority, ResourceRequest.ANY, capability, 1);
+    if (amLabel != null && !amLabel.isEmpty()) {
+      amResourceRequest.setNodeLabelExpression(amLabel.trim());
+    }
+    return submitApp(Collections.singletonList(amResourceRequest), name, user,
+        acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted,
+        keepContainers, isAppIdProvided, applicationId,
+        attemptFailuresValidityInterval, logAggregationContext,
+        cancelTokensWhenComplete, priority, amLabel, applicationTimeouts,
+        tokensConf);
+  }
+
+  public RMApp submitApp(List<ResourceRequest> amResourceRequests, String name,
+      String user, Map<ApplicationAccessType, String> acls, boolean unmanaged,
+      String queue, int maxAppAttempts, Credentials ts, String appType,
+      boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
+      ApplicationId applicationId, long attemptFailuresValidityInterval,
+      LogAggregationContext logAggregationContext,
+      boolean cancelTokensWhenComplete, Priority priority, String amLabel,
+      Map<ApplicationTimeoutType, Long> applicationTimeouts,
+      ByteBuffer tokensConf)
+      throws Exception {
     ApplicationId appId = isAppIdProvided ? applicationId : null;
     ApplicationClientProtocol client = getClientRMService();
     if (! isAppIdProvided) {
@@ -718,7 +754,6 @@ public class MockRM extends ResourceManager {
     sub.setApplicationType(appType);
     ContainerLaunchContext clc = Records
         .newRecord(ContainerLaunchContext.class);
-    sub.setResource(capability);
     clc.setApplicationACLs(acls);
     if (ts != null && UserGroupInformation.isSecurityEnabled()) {
       DataOutputBuffer dob = new DataOutputBuffer();
@@ -733,12 +768,12 @@ public class MockRM extends ResourceManager {
       sub.setLogAggregationContext(logAggregationContext);
     }
     sub.setCancelTokensWhenComplete(cancelTokensWhenComplete);
-    ResourceRequest amResourceRequest = ResourceRequest.newInstance(
-        Priority.newInstance(0), ResourceRequest.ANY, capability, 1);
     if (amLabel != null && !amLabel.isEmpty()) {
-      amResourceRequest.setNodeLabelExpression(amLabel.trim());
+      for (ResourceRequest amResourceRequest : amResourceRequests) {
+        amResourceRequest.setNodeLabelExpression(amLabel.trim());
+      }
     }
-    sub.setAMContainerResourceRequest(amResourceRequest);
+    sub.setAMContainerResourceRequests(amResourceRequests);
     req.setApplicationSubmissionContext(sub);
     UserGroupInformation fakeUser =
       UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});

+ 192 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java

@@ -31,6 +31,8 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
@@ -50,6 +52,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -57,11 +61,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
@@ -72,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -307,7 +314,7 @@ public class TestAppManager{
     ResourceRequest resReg =
         ResourceRequest.newInstance(Priority.newInstance(0),
             ResourceRequest.ANY, Resource.newInstance(1024, 1), 1);
-    sub.setAMContainerResourceRequest(resReg);
+    sub.setAMContainerResourceRequests(Collections.singletonList(resReg));
     req.setApplicationSubmissionContext(sub);
     sub.setAMContainerSpec(mock(ContainerLaunchContext.class));
     try {
@@ -517,8 +524,157 @@ public class TestAppManager{
     Assert.assertEquals("app event type is wrong before", RMAppEventType.KILL, appEventType);
   }
 
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testRMAppSubmitAMContainerResourceRequests() throws Exception {
+    asContext.setResource(Resources.createResource(1024));
+    asContext.setAMContainerResourceRequest(
+        ResourceRequest.newInstance(Priority.newInstance(0),
+            ResourceRequest.ANY, Resources.createResource(1024), 1, true));
+    List<ResourceRequest> reqs = new ArrayList<>();
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(0),
+        ResourceRequest.ANY, Resources.createResource(1025), 1, false));
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(0),
+        "/rack", Resources.createResource(1025), 1, false));
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(0),
+        "/rack/node", Resources.createResource(1025), 1, true));
+    asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs));
+    // getAMContainerResourceRequest uses the first entry of
+    // getAMContainerResourceRequests
+    Assert.assertEquals(reqs.get(0), asContext.getAMContainerResourceRequest());
+    Assert.assertEquals(reqs, asContext.getAMContainerResourceRequests());
+    RMApp app = testRMAppSubmit();
+    for (ResourceRequest req : reqs) {
+      req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
+    }
+    // setAMContainerResourceRequests has priority over
+    // setAMContainerResourceRequest and setResource
+    Assert.assertEquals(reqs, app.getAMResourceRequests());
+  }
+
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testRMAppSubmitAMContainerResourceRequest() throws Exception {
+    asContext.setResource(Resources.createResource(1024));
+    asContext.setAMContainerResourceRequests(null);
+    ResourceRequest req =
+        ResourceRequest.newInstance(Priority.newInstance(0),
+            ResourceRequest.ANY, Resources.createResource(1025), 1, true);
+    asContext.setAMContainerResourceRequest(cloneResourceRequest(req));
+    // getAMContainerResourceRequests uses a singleton list of
+    // getAMContainerResourceRequest
+    Assert.assertEquals(req, asContext.getAMContainerResourceRequest());
+    Assert.assertEquals(req, asContext.getAMContainerResourceRequests().get(0));
+    Assert.assertEquals(1, asContext.getAMContainerResourceRequests().size());
+    RMApp app = testRMAppSubmit();
+    req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
+    // setAMContainerResourceRequest has priority over setResource
+    Assert.assertEquals(Collections.singletonList(req),
+        app.getAMResourceRequests());
+  }
+
   @Test
-  public void testRMAppSubmit() throws Exception {
+  public void testRMAppSubmitResource() throws Exception {
+    asContext.setResource(Resources.createResource(1024));
+    asContext.setAMContainerResourceRequests(null);
+    RMApp app = testRMAppSubmit();
+    // setResource
+    Assert.assertEquals(Collections.singletonList(
+        ResourceRequest.newInstance(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
+        ResourceRequest.ANY, Resources.createResource(1024), 1, true, "")),
+        app.getAMResourceRequests());
+  }
+
+  @Test
+  public void testRMAppSubmitNoResourceRequests() throws Exception {
+    asContext.setResource(null);
+    asContext.setAMContainerResourceRequests(null);
+    try {
+      testRMAppSubmit();
+      Assert.fail("Should have failed due to no ResourceRequest");
+    } catch (InvalidResourceRequestException e) {
+      Assert.assertEquals(
+          "Invalid resource request, no resources requested",
+          e.getMessage());
+    }
+  }
+
+  @Test
+  public void testRMAppSubmitAMContainerResourceRequestsDisagree()
+      throws Exception {
+    asContext.setResource(null);
+    List<ResourceRequest> reqs = new ArrayList<>();
+    ResourceRequest anyReq = ResourceRequest.newInstance(
+        Priority.newInstance(1),
+        ResourceRequest.ANY, Resources.createResource(1024), 1, false, "label1",
+        ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
+    reqs.add(anyReq);
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(2),
+        "/rack", Resources.createResource(1025), 2, false, "",
+        ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)));
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(3),
+        "/rack/node", Resources.createResource(1026), 3, true, "",
+        ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC)));
+    asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs));
+    RMApp app = testRMAppSubmit();
+    // It should force the requests to all agree on these points
+    for (ResourceRequest req : reqs) {
+      req.setCapability(anyReq.getCapability());
+      req.setExecutionTypeRequest(
+          ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
+      req.setNumContainers(1);
+      req.setPriority(Priority.newInstance(0));
+    }
+    Assert.assertEquals(reqs, app.getAMResourceRequests());
+  }
+
+  @Test
+  public void testRMAppSubmitAMContainerResourceRequestsNoAny()
+      throws Exception {
+    asContext.setResource(null);
+    List<ResourceRequest> reqs = new ArrayList<>();
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(1),
+        "/rack", Resources.createResource(1025), 1, false));
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(1),
+        "/rack/node", Resources.createResource(1025), 1, true));
+    asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs));
+    // getAMContainerResourceRequest uses the first entry of
+    // getAMContainerResourceRequests
+    Assert.assertEquals(reqs, asContext.getAMContainerResourceRequests());
+    try {
+      testRMAppSubmit();
+      Assert.fail("Should have failed due to missing ANY ResourceRequest");
+    } catch (InvalidResourceRequestException e) {
+      Assert.assertEquals(
+          "Invalid resource request, no resource request specified with *",
+          e.getMessage());
+    }
+  }
+
+  @Test
+  public void testRMAppSubmitAMContainerResourceRequestsTwoManyAny()
+      throws Exception {
+    asContext.setResource(null);
+    List<ResourceRequest> reqs = new ArrayList<>();
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(1),
+        ResourceRequest.ANY, Resources.createResource(1025), 1, false));
+    reqs.add(ResourceRequest.newInstance(Priority.newInstance(1),
+        ResourceRequest.ANY, Resources.createResource(1025), 1, false));
+    asContext.setAMContainerResourceRequests(cloneResourceRequests(reqs));
+    // getAMContainerResourceRequest uses the first entry of
+    // getAMContainerResourceRequests
+    Assert.assertEquals(reqs, asContext.getAMContainerResourceRequests());
+    try {
+      testRMAppSubmit();
+      Assert.fail("Should have failed due to too many ANY ResourceRequests");
+    } catch (InvalidResourceRequestException e) {
+      Assert.assertEquals(
+          "Invalid resource request, only one resource request with * is " +
+              "allowed", e.getMessage());
+    }
+  }
+
+  private RMApp testRMAppSubmit() throws Exception {
     appMonitor.submitApplication(asContext, "test");
     RMApp app = rmContext.getRMApps().get(appId);
     Assert.assertNotNull("app is null", app);
@@ -529,12 +685,14 @@ public class TestAppManager{
 
     // wait for event to be processed
     int timeoutSecs = 0;
-    while ((getAppEventType() == RMAppEventType.KILL) && 
+    while ((getAppEventType() == RMAppEventType.KILL) &&
         timeoutSecs++ < 20) {
       Thread.sleep(1000);
     }
     Assert.assertEquals("app event type sent is wrong", RMAppEventType.START,
         getAppEventType());
+
+    return app;
   }
 
   @Test
@@ -732,6 +890,15 @@ public class TestAppManager{
     ResourceCalculator rs = mock(ResourceCalculator.class);
     when(scheduler.getResourceCalculator()).thenReturn(rs);
 
+    when(scheduler.getNormalizedResource((Resource) any()))
+        .thenAnswer(new Answer<Resource>() {
+      @Override
+      public Resource answer(InvocationOnMock invocationOnMock)
+          throws Throwable {
+        return (Resource) invocationOnMock.getArguments()[0];
+      }
+    });
+
     return scheduler;
   }
 
@@ -748,4 +915,26 @@ public class TestAppManager{
         YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
   }
 
+  private static ResourceRequest cloneResourceRequest(ResourceRequest req) {
+    return ResourceRequest.newInstance(
+        Priority.newInstance(req.getPriority().getPriority()),
+        new String(req.getResourceName()),
+        Resource.newInstance(req.getCapability().getMemorySize(),
+            req.getCapability().getVirtualCores()),
+        req.getNumContainers(),
+        req.getRelaxLocality(),
+        req.getNodeLabelExpression() != null
+            ? new String(req.getNodeLabelExpression()) : null,
+        ExecutionTypeRequest.newInstance(
+            req.getExecutionTypeRequest().getExecutionType()));
+  }
+
+  private static List<ResourceRequest> cloneResourceRequests(
+      List<ResourceRequest> reqs) {
+    List<ResourceRequest> cloneReqs = new ArrayList<>();
+    for (ResourceRequest req : reqs) {
+      cloneReqs.add(cloneResourceRequest(req));
+    }
+    return cloneReqs;
+  }
 }

+ 5 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java

@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1039,9 +1040,9 @@ public class TestClientRMService {
         spy(new RMAppImpl(applicationId3, rmContext, config, null, null,
             queueName, asContext, yarnScheduler, null,
             System.currentTimeMillis(), "YARN", null,
-            BuilderUtils.newResourceRequest(
+            Collections.singletonList(BuilderUtils.newResourceRequest(
                 RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
-                Resource.newInstance(1024, 1), 1)){
+                Resource.newInstance(1024, 1), 1))){
                   @Override
                   public ApplicationReport createAndGetApplicationReport(
                       String clientUserName, boolean allowAccess) {
@@ -1055,7 +1056,8 @@ public class TestClientRMService {
                     return report;
                   }
               });
-    app.getAMResourceRequest().setNodeLabelExpression(amNodeLabelExpression);
+    app.getAMResourceRequests().get(0)
+        .setNodeLabelExpression(amNodeLabelExpression);
     ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
         ApplicationId.newInstance(123456, 1), 1);
     RMAppAttemptImpl rmAppAttemptImpl = spy(new RMAppAttemptImpl(attemptId,

+ 184 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -28,6 +29,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -156,6 +160,186 @@ public class TestNodeBlacklistingOnAMFailures {
         currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
   }
 
+  @Test(timeout = 100000)
+  public void testNodeBlacklistingOnAMFailureStrictNodeLocality()
+      throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
+        true);
+
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm = startRM(conf, dispatcher);
+    CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
+
+    // Register 5 nodes, so that we can blacklist atleast one if AM container
+    // is failed. As per calculation it will be like, 5nodes * 0.2 (default)=1.
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    MockNM nm2 =
+        new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService());
+    nm2.registerNode();
+
+    MockNM nm3 =
+        new MockNM("127.0.0.3:2345", 8000, rm.getResourceTrackerService());
+    nm3.registerNode();
+
+    MockNM nm4 =
+        new MockNM("127.0.0.4:2345", 8000, rm.getResourceTrackerService());
+    nm4.registerNode();
+
+    MockNM nm5 =
+        new MockNM("127.0.0.5:2345", 8000, rm.getResourceTrackerService());
+    nm5.registerNode();
+
+    // Specify a strict locality on nm2
+    List<ResourceRequest> reqs = new ArrayList<>();
+    ResourceRequest nodeReq = ResourceRequest.newInstance(
+        Priority.newInstance(0), nm2.getNodeId().getHost(),
+        Resource.newInstance(200, 1), 1, true);
+    ResourceRequest rackReq = ResourceRequest.newInstance(
+        Priority.newInstance(0), "/default-rack",
+        Resource.newInstance(200, 1), 1, false);
+    ResourceRequest anyReq = ResourceRequest.newInstance(
+        Priority.newInstance(0), ResourceRequest.ANY,
+        Resource.newInstance(200, 1), 1, false);
+    reqs.add(anyReq);
+    reqs.add(rackReq);
+    reqs.add(nodeReq);
+    RMApp app = rm.submitApp(reqs);
+
+    MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm2);
+    ContainerId amContainerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    RMContainer rmContainer = scheduler.getRMContainer(amContainerId);
+    NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
+    Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan);
+
+    // Set the exist status to INVALID so that we can verify that the system
+    // automatically blacklisting the node
+    makeAMContainerExit(rm, amContainerId, nm2, ContainerExitStatus.INVALID);
+
+    // restart the am
+    RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
+    System.out.println("New AppAttempt launched " + attempt.getAppAttemptId());
+
+    nm2.nodeHeartbeat(true);
+    dispatcher.await();
+
+    // Now the AM container should be allocated
+    MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
+
+    MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId());
+    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
+    amContainerId =
+        ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
+    rmContainer = scheduler.getRMContainer(amContainerId);
+    nodeWhereAMRan = rmContainer.getAllocatedNode();
+
+    // The second AM should be on the same node because the strict locality
+    // made the eligible nodes only 1, so the blacklisting threshold kicked in
+    System.out.println("AM ran on " + nodeWhereAMRan);
+    Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan);
+
+    am2.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+  }
+
+  @Test(timeout = 100000)
+  public void testNodeBlacklistingOnAMFailureRelaxedNodeLocality()
+      throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
+        true);
+
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm = startRM(conf, dispatcher);
+    CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
+
+    // Register 5 nodes, so that we can blacklist atleast one if AM container
+    // is failed. As per calculation it will be like, 5nodes * 0.2 (default)=1.
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    MockNM nm2 =
+        new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService());
+    nm2.registerNode();
+
+    MockNM nm3 =
+        new MockNM("127.0.0.3:2345", 8000, rm.getResourceTrackerService());
+    nm3.registerNode();
+
+    MockNM nm4 =
+        new MockNM("127.0.0.4:2345", 8000, rm.getResourceTrackerService());
+    nm4.registerNode();
+
+    MockNM nm5 =
+        new MockNM("127.0.0.5:2345", 8000, rm.getResourceTrackerService());
+    nm5.registerNode();
+
+    // Specify a relaxed locality on nm2
+    List<ResourceRequest> reqs = new ArrayList<>();
+    ResourceRequest nodeReq = ResourceRequest.newInstance(
+        Priority.newInstance(0), nm2.getNodeId().getHost(),
+        Resource.newInstance(200, 1), 1, true);
+    ResourceRequest rackReq = ResourceRequest.newInstance(
+        Priority.newInstance(0), "/default-rack",
+        Resource.newInstance(200, 1), 1, true);
+    ResourceRequest anyReq = ResourceRequest.newInstance(
+        Priority.newInstance(0), ResourceRequest.ANY,
+        Resource.newInstance(200, 1), 1, true);
+    reqs.add(anyReq);
+    reqs.add(rackReq);
+    reqs.add(nodeReq);
+    RMApp app = rm.submitApp(reqs);
+
+    MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm2);
+    ContainerId amContainerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    RMContainer rmContainer = scheduler.getRMContainer(amContainerId);
+    NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
+    Assert.assertEquals(nm2.getNodeId(), nodeWhereAMRan);
+
+    // Set the exist status to INVALID so that we can verify that the system
+    // automatically blacklisting the node
+    makeAMContainerExit(rm, amContainerId, nm2, ContainerExitStatus.INVALID);
+
+    // restart the am
+    RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
+    System.out.println("New AppAttempt launched " + attempt.getAppAttemptId());
+
+    nm2.nodeHeartbeat(true);
+    nm1.nodeHeartbeat(true);
+    nm3.nodeHeartbeat(true);
+    nm4.nodeHeartbeat(true);
+    nm5.nodeHeartbeat(true);
+    dispatcher.await();
+
+    // Now the AM container should be allocated
+    MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
+
+    MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId());
+    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
+    amContainerId =
+        ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
+    rmContainer = scheduler.getRMContainer(amContainerId);
+    nodeWhereAMRan = rmContainer.getAllocatedNode();
+
+    // The second AM should be on a different node because the relaxed locality
+    // made the app schedulable on other nodes and nm2 is blacklisted
+    System.out.println("AM ran on " + nodeWhereAMRan);
+    Assert.assertNotEquals(nm2.getNodeId(), nodeWhereAMRan);
+
+    am2.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+  }
+
   @Test(timeout = 100000)
   public void testNoBlacklistingForNonSystemErrors() throws Exception {
 

+ 297 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMServerUtils.java

@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class TestRMServerUtils {
+  @Test
+  public void testGetApplicableNodeCountForAMLocality() throws Exception {
+    List<NodeId> rack1Nodes = new ArrayList<>();
+    for (int i = 0; i < 29; i++) {
+      rack1Nodes.add(NodeId.newInstance("host" + i, 1234));
+    }
+    NodeId node1 = NodeId.newInstance("node1", 1234);
+    NodeId node2 = NodeId.newInstance("node2", 1234);
+    rack1Nodes.add(node2);
+
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, false);
+    ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
+    Mockito.when(scheduler.getNumClusterNodes()).thenReturn(100);
+    Mockito.when(scheduler.getNodeIds("/rack1")).thenReturn(rack1Nodes);
+    Mockito.when(scheduler.getNodeIds("node1"))
+        .thenReturn(Collections.singletonList(node1));
+    Mockito.when(scheduler.getNodeIds("node2"))
+        .thenReturn(Collections.singletonList(node2));
+    RMContext rmContext = Mockito.mock(RMContext.class);
+    Mockito.when(rmContext.getScheduler()).thenReturn(scheduler);
+
+    ResourceRequest anyReq = createResourceRequest(ResourceRequest.ANY,
+        true, null);
+    List<ResourceRequest> reqs = new ArrayList<>();
+    reqs.add(anyReq);
+    Assert.assertEquals(100,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+
+    ResourceRequest rackReq = createResourceRequest("/rack1", true, null);
+    reqs.add(rackReq);
+    Assert.assertEquals(30,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    anyReq.setRelaxLocality(false);
+    Assert.assertEquals(30,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    rackReq.setRelaxLocality(false);
+    Assert.assertEquals(100,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+
+    ResourceRequest node1Req = createResourceRequest("node1", false, null);
+    reqs.add(node1Req);
+    Assert.assertEquals(100,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    node1Req.setRelaxLocality(true);
+    Assert.assertEquals(1,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    rackReq.setRelaxLocality(true);
+    Assert.assertEquals(31,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+
+    ResourceRequest node2Req = createResourceRequest("node2", false, null);
+    reqs.add(node2Req);
+    Assert.assertEquals(31,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    node2Req.setRelaxLocality(true);
+    Assert.assertEquals(31,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    rackReq.setRelaxLocality(false);
+    Assert.assertEquals(2,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    node1Req.setRelaxLocality(false);
+    Assert.assertEquals(1,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    node2Req.setRelaxLocality(false);
+    Assert.assertEquals(100,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+  }
+
+  @Test
+  public void testGetApplicableNodeCountForAMLabels() throws Exception {
+    Set<NodeId> noLabelNodes = new HashSet<>();
+    for (int i = 0; i < 80; i++) {
+      noLabelNodes.add(NodeId.newInstance("host" + i, 1234));
+    }
+    Set<NodeId> label1Nodes = new HashSet<>();
+    for (int i = 80; i < 90; i++) {
+      label1Nodes.add(NodeId.newInstance("host" + i, 1234));
+    }
+    label1Nodes.add(NodeId.newInstance("host101", 0));
+    label1Nodes.add(NodeId.newInstance("host102", 0));
+    Map<String, Set<NodeId>> label1NodesMap = new HashMap<>();
+    label1NodesMap.put("label1", label1Nodes);
+
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+    ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
+    Mockito.when(scheduler.getNumClusterNodes()).thenReturn(100);
+    RMContext rmContext = Mockito.mock(RMContext.class);
+    Mockito.when(rmContext.getScheduler()).thenReturn(scheduler);
+    RMNodeLabelsManager labMan = Mockito.mock(RMNodeLabelsManager.class);
+    Mockito.when(labMan.getNodesWithoutALabel()).thenReturn(noLabelNodes);
+    Mockito.when(labMan.getLabelsToNodes(Collections.singleton("label1")))
+        .thenReturn(label1NodesMap);
+    Mockito.when(rmContext.getNodeLabelManager()).thenReturn(labMan);
+
+    ResourceRequest anyReq = createResourceRequest(ResourceRequest.ANY,
+        true, null);
+    List<ResourceRequest> reqs = new ArrayList<>();
+    reqs.add(anyReq);
+    Assert.assertEquals(80,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    anyReq.setNodeLabelExpression("label1");
+    Assert.assertEquals(10,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+  }
+
+  @Test
+  public void testGetApplicableNodeCountForAMLocalityAndLabels()
+      throws Exception {
+    List<NodeId> rack1Nodes = new ArrayList<>();
+    for (int i = 0; i < 29; i++) {
+      rack1Nodes.add(NodeId.newInstance("host" + i, 1234));
+    }
+    NodeId node1 = NodeId.newInstance("node1", 1234);
+    NodeId node2 = NodeId.newInstance("node2", 1234);
+    rack1Nodes.add(node2);
+    Set<NodeId> noLabelNodes = new HashSet<>();
+    for (int i = 0; i < 19; i++) {
+      noLabelNodes.add(rack1Nodes.get(i));
+    }
+    noLabelNodes.add(node2);
+    for (int i = 29; i < 89; i++) {
+      noLabelNodes.add(NodeId.newInstance("host" + i, 1234));
+    }
+    Set<NodeId> label1Nodes = new HashSet<>();
+    label1Nodes.add(node1);
+    for (int i = 89; i < 93; i++) {
+      label1Nodes.add(NodeId.newInstance("host" + i, 1234));
+    }
+    for (int i = 19; i < 29; i++) {
+      label1Nodes.add(rack1Nodes.get(i));
+    }
+    label1Nodes.add(NodeId.newInstance("host101", 0));
+    label1Nodes.add(NodeId.newInstance("host102", 0));
+    Map<String, Set<NodeId>> label1NodesMap = new HashMap<>();
+    label1NodesMap.put("label1", label1Nodes);
+
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+    ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
+    Mockito.when(scheduler.getNumClusterNodes()).thenReturn(100);
+    Mockito.when(scheduler.getNodeIds("/rack1")).thenReturn(rack1Nodes);
+    Mockito.when(scheduler.getNodeIds("node1"))
+        .thenReturn(Collections.singletonList(node1));
+    Mockito.when(scheduler.getNodeIds("node2"))
+        .thenReturn(Collections.singletonList(node2));
+    RMContext rmContext = Mockito.mock(RMContext.class);
+    Mockito.when(rmContext.getScheduler()).thenReturn(scheduler);
+    RMNodeLabelsManager labMan = Mockito.mock(RMNodeLabelsManager.class);
+    Mockito.when(labMan.getNodesWithoutALabel()).thenReturn(noLabelNodes);
+    Mockito.when(labMan.getLabelsToNodes(Collections.singleton("label1")))
+        .thenReturn(label1NodesMap);
+    Mockito.when(rmContext.getNodeLabelManager()).thenReturn(labMan);
+
+    ResourceRequest anyReq = createResourceRequest(ResourceRequest.ANY,
+        true, null);
+    List<ResourceRequest> reqs = new ArrayList<>();
+    reqs.add(anyReq);
+    Assert.assertEquals(80,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+
+    ResourceRequest rackReq = createResourceRequest("/rack1", true, null);
+    reqs.add(rackReq);
+    Assert.assertEquals(20,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    anyReq.setRelaxLocality(false);
+    Assert.assertEquals(20,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    rackReq.setRelaxLocality(false);
+    Assert.assertEquals(80,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+
+    ResourceRequest node1Req = createResourceRequest("node1", false, null);
+    reqs.add(node1Req);
+    Assert.assertEquals(80,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    node1Req.setRelaxLocality(true);
+    Assert.assertEquals(0,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    rackReq.setRelaxLocality(true);
+    Assert.assertEquals(20,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+
+    ResourceRequest node2Req = createResourceRequest("node2", false, null);
+    reqs.add(node2Req);
+    Assert.assertEquals(20,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    node2Req.setRelaxLocality(true);
+    Assert.assertEquals(20,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    rackReq.setRelaxLocality(false);
+    Assert.assertEquals(1,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    node1Req.setRelaxLocality(false);
+    Assert.assertEquals(1,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    node2Req.setRelaxLocality(false);
+    Assert.assertEquals(80,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+
+    anyReq.setNodeLabelExpression("label1");
+    rackReq.setNodeLabelExpression("label1");
+    node1Req.setNodeLabelExpression("label1");
+    node2Req.setNodeLabelExpression("label1");
+    anyReq.setRelaxLocality(true);
+    reqs = new ArrayList<>();
+    reqs.add(anyReq);
+    Assert.assertEquals(15,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+
+    rackReq.setRelaxLocality(true);
+    reqs.add(rackReq);
+    Assert.assertEquals(10,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    anyReq.setRelaxLocality(false);
+    Assert.assertEquals(10,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    rackReq.setRelaxLocality(false);
+    Assert.assertEquals(15,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+
+    node1Req.setRelaxLocality(false);
+    reqs.add(node1Req);
+    Assert.assertEquals(15,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    node1Req.setRelaxLocality(true);
+    Assert.assertEquals(1,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    rackReq.setRelaxLocality(true);
+    Assert.assertEquals(11,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+
+    node2Req.setRelaxLocality(false);
+    reqs.add(node2Req);
+    Assert.assertEquals(11,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    node2Req.setRelaxLocality(true);
+    Assert.assertEquals(11,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    rackReq.setRelaxLocality(false);
+    Assert.assertEquals(1,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    node1Req.setRelaxLocality(false);
+    Assert.assertEquals(0,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+    node2Req.setRelaxLocality(false);
+    Assert.assertEquals(15,
+        RMServerUtils.getApplicableNodeCountForAM(rmContext, conf, reqs));
+  }
+
+  private ResourceRequest createResourceRequest(String resource,
+      boolean relaxLocality, String nodeLabel) {
+    return ResourceRequest.newInstance(Priority.newInstance(0),
+        resource, Resource.newInstance(1, 1), 1, relaxLocality, nodeLabel);
+  }
+}

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java

@@ -57,7 +57,7 @@ import com.google.common.collect.Lists;
 public abstract class MockAsm extends MockApps {
 
   public static class ApplicationBase implements RMApp {
-    ResourceRequest amReq;
+    List<ResourceRequest> amReqs;
     @Override
     public String getUser() {
       throw new UnsupportedOperationException("Not supported yet.");
@@ -192,8 +192,8 @@ public abstract class MockAsm extends MockApps {
     }
     
     @Override
-    public ResourceRequest getAMResourceRequest() {
-      return this.amReq; 
+    public List<ResourceRequest> getAMResourceRequests() {
+      return this.amReqs;
     }
 
     @Override

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java

@@ -525,7 +525,8 @@ public class TestSystemMetricsPublisher {
     when(app.getAppNodeLabelExpression()).thenCallRealMethod();
     ResourceRequest amReq = mock(ResourceRequest.class);
     when(amReq.getNodeLabelExpression()).thenReturn("high-mem");
-    when(app.getAMResourceRequest()).thenReturn(amReq);
+    when(app.getAMResourceRequests())
+        .thenReturn(Collections.singletonList(amReq));
     when(app.getAmNodeLabelExpression()).thenCallRealMethod();
     when(app.getApplicationPriority()).thenReturn(Priority.newInstance(10));
     when(app.getCallerContext())

+ 6 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -62,14 +63,14 @@ public class MockRMApp implements RMApp {
   StringBuilder diagnostics = new StringBuilder();
   RMAppAttempt attempt;
   int maxAppAttempts = 1;
-  ResourceRequest amReq;
+  List<ResourceRequest> amReqs;
 
   public MockRMApp(int newid, long time, RMAppState newState) {
     finish = time;
     id = MockApps.newAppID(newid);
     state = newState;
-    amReq = ResourceRequest.newInstance(Priority.UNDEFINED, "0.0.0.0",
-        Resource.newInstance(0, 0), 1);
+    amReqs = Collections.singletonList(ResourceRequest.newInstance(
+        Priority.UNDEFINED, "0.0.0.0", Resource.newInstance(0, 0), 1));
   }
 
   public MockRMApp(int newid, long time, RMAppState newState, String userName) {
@@ -276,8 +277,8 @@ public class MockRMApp implements RMApp {
   }
   
   @Override
-  public ResourceRequest getAMResourceRequest() {
-    return this.amReq; 
+  public List<ResourceRequest> getAMResourceRequests() {
+    return this.amReqs;
   }
 
   @Override

+ 6 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

@@ -30,8 +30,10 @@ import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -267,7 +269,8 @@ public class TestRMAppTransitions {
     submissionContext.setAMContainerSpec(mock(ContainerLaunchContext.class));
     RMApp application = new RMAppImpl(applicationId, rmContext, conf, name,
         user, queue, submissionContext, scheduler, masterService,
-        System.currentTimeMillis(), "YARN", null, mock(ResourceRequest.class));
+        System.currentTimeMillis(), "YARN", null,
+        new ArrayList<ResourceRequest>());
 
     testAppStartState(applicationId, user, name, queue, application);
     this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),
@@ -1020,9 +1023,9 @@ public class TestRMAppTransitions {
             submissionContext.getQueue(), submissionContext, scheduler, null,
             appState.getSubmitTime(), submissionContext.getApplicationType(),
             submissionContext.getApplicationTags(),
-            BuilderUtils.newResourceRequest(
+            Collections.singletonList(BuilderUtils.newResourceRequest(
                 RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
-                submissionContext.getResource(), 1));
+                submissionContext.getResource(), 1)));
     Assert.assertEquals(RMAppState.NEW, application.getState());
 
     RMAppEvent recoverEvent =

+ 10 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java

@@ -328,9 +328,9 @@ public class TestRMAppAttemptTransitions {
     applicationAttempt =
         new RMAppAttemptImpl(applicationAttemptId, spyRMContext, scheduler,
             masterService, submissionContext, new Configuration(),
-            BuilderUtils.newResourceRequest(
+            Collections.singletonList(BuilderUtils.newResourceRequest(
                 RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
-                submissionContext.getResource(), 1), application);
+                submissionContext.getResource(), 1)), application);
 
     when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
     when(application.getApplicationId()).thenReturn(applicationId);
@@ -1108,9 +1108,9 @@ public class TestRMAppAttemptTransitions {
         new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(),
             spyRMContext, scheduler,masterService,
             submissionContext, myConf,
-            BuilderUtils.newResourceRequest(
+            Collections.singletonList(BuilderUtils.newResourceRequest(
                 RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
-                submissionContext.getResource(), 1), application);
+                submissionContext.getResource(), 1)), application);
 
     //submit, schedule and allocate app attempt
     myApplicationAttempt.handle(
@@ -1584,9 +1584,9 @@ public class TestRMAppAttemptTransitions {
     applicationAttempt =
         new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(), spyRMContext,
           scheduler, masterService, submissionContext, new Configuration(),
-          BuilderUtils.newResourceRequest(
+            Collections.singletonList(BuilderUtils.newResourceRequest(
               RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
-              submissionContext.getResource(), 1), application);
+              submissionContext.getResource(), 1)), application);
     when(submissionContext.getKeepContainersAcrossApplicationAttempts())
       .thenReturn(true);
     when(submissionContext.getMaxAppAttempts()).thenReturn(1);
@@ -1645,9 +1645,10 @@ public class TestRMAppAttemptTransitions {
     applicationAttempt =
         new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(),
             spyRMContext, scheduler, masterService, submissionContext,
-            new Configuration(), ResourceRequest.newInstance(
-                Priority.UNDEFINED, "host1", Resource.newInstance(3333, 1), 3,
-                false, "label-expression"), application);
+            new Configuration(), Collections.singletonList(
+                ResourceRequest.newInstance(Priority.UNDEFINED, "host1",
+                    Resource.newInstance(3333, 1), 3,
+                false, "label-expression")), application);
     new RMAppAttemptImpl.ScheduleTransition().transition(
         (RMAppAttemptImpl) applicationAttempt, null);
   }

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java

@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -613,7 +614,8 @@ public class TestApplicationLimits {
     ResourceRequest amResourceRequest = mock(ResourceRequest.class);
     Resource amResource = Resources.createResource(0, 0);
     when(amResourceRequest.getCapability()).thenReturn(amResource);
-    when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
+    when(rmApp.getAMResourceRequests()).thenReturn(
+        Collections.singletonList(amResourceRequest));
     Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
     when(spyRMContext.getRMApps()).thenReturn(spyApps);
     RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java

@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -639,7 +640,8 @@ public class TestApplicationLimitsByPartition {
     ResourceRequest amResourceRequest = mock(ResourceRequest.class);
     Resource amResource = Resources.createResource(0, 0);
     when(amResourceRequest.getCapability()).thenReturn(amResource);
-    when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
+    when(rmApp.getAMResourceRequests()).thenReturn(
+        Collections.singletonList(amResourceRequest));
     Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
     when(spyRMContext.getRMApps()).thenReturn(spyApps);
     RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -3203,8 +3203,8 @@ public class TestCapacityScheduler {
     RMApp rmApp = rm.submitApp(amMemory, "app-1", "user_0", null, queueName);
        
     assertEquals("RMApp does not containes minimum allocation",
-        minAllocResource, rmApp.getAMResourceRequest().getCapability());
-    
+        minAllocResource, rmApp.getAMResourceRequests().get(0).getCapability());
+
     ResourceScheduler scheduler = rm.getRMContext().getScheduler();
     LeafQueue queueA =
         (LeafQueue) ((CapacityScheduler) scheduler).getQueue(queueName);

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -153,7 +153,8 @@ public class TestLeafQueue {
     amResourceRequest = mock(ResourceRequest.class);
     when(amResourceRequest.getCapability()).thenReturn(
       Resources.createResource(0, 0));
-    when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
+    when(rmApp.getAMResourceRequests()).thenReturn(
+        Collections.singletonList(amResourceRequest));
     Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
     when(spyRMContext.getRMApps()).thenReturn(spyApps);
     

+ 78 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -3206,6 +3206,84 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals(1, app.getLiveContainers().size());
   }
 
+  @Test
+  public void testAMStrictLocalityRack() throws IOException {
+    testAMStrictLocality(false, false);
+  }
+
+  @Test
+  public void testAMStrictLocalityNode() throws IOException {
+    testAMStrictLocality(true, false);
+  }
+
+  @Test
+  public void testAMStrictLocalityRackInvalid() throws IOException {
+    testAMStrictLocality(false, true);
+  }
+
+  @Test
+  public void testAMStrictLocalityNodeInvalid() throws IOException {
+    testAMStrictLocality(true, true);
+  }
+
+  private void testAMStrictLocality(boolean node, boolean invalid)
+      throws IOException {
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1,
+        "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 = MockNodes.newNodeInfo(2, Resources.createResource(1024), 2,
+        "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    List<ResourceRequest> reqs = new ArrayList<>();
+    ResourceRequest nodeRequest = createResourceRequest(1024,
+        node2.getHostName(), 1, 1, true);
+    if (node && invalid) {
+      nodeRequest.setResourceName("invalid");
+    }
+    ResourceRequest rackRequest = createResourceRequest(1024,
+        node2.getRackName(), 1, 1, !node);
+    if (!node && invalid) {
+      rackRequest.setResourceName("invalid");
+    }
+    ResourceRequest anyRequest = createResourceRequest(1024,
+        ResourceRequest.ANY, 1, 1, false);
+    reqs.add(anyRequest);
+    reqs.add(rackRequest);
+    if (node) {
+      reqs.add(nodeRequest);
+    }
+
+    ApplicationAttemptId attId1 =
+        createSchedulingRequest("queue1", "user1", reqs);
+
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent node2UpdateEvent =
+        new NodeUpdateSchedulerEvent(node2);
+
+    FSAppAttempt app = scheduler.getSchedulerApp(attId1);
+
+    // node2 should get the container
+    scheduler.handle(node2UpdateEvent);
+    if (invalid) {
+      assertEquals(0, app.getLiveContainers().size());
+      assertEquals(0, scheduler.getNode(node2.getNodeID()).getNumContainers());
+      assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumContainers());
+    } else {
+      assertEquals(1, app.getLiveContainers().size());
+      assertEquals(1, scheduler.getNode(node2.getNodeID()).getNumContainers());
+      assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumContainers());
+    }
+  }
+
   /**
    * Strict locality requests shouldn't reserve resources on another node.
    */

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java

@@ -1168,8 +1168,8 @@ public class TestRMWebServicesApps extends JerseyTestBase {
     assertEquals(app1.getApplicationId().toString(), appInfo.getAppId());
     assertEquals(app1.getName(), appInfo.getName());
     assertEquals(app1.createApplicationState(), appInfo.getState());
-    assertEquals(app1.getAMResourceRequest().getCapability().getMemorySize(),
-        appInfo.getAllocatedMB());
+    assertEquals(app1.getAMResourceRequests().get(0).getCapability()
+            .getMemorySize(), appInfo.getAllocatedMB());
 
     rm.stop();
   }
@@ -1378,7 +1378,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
       expectedNumberOfElements++;
       appNodeLabelExpression = info.getString("appNodeLabelExpression");
     }
-    if (app.getAMResourceRequest().getNodeLabelExpression() != null) {
+    if (app.getAMResourceRequests().get(0).getNodeLabelExpression() != null) {
       expectedNumberOfElements++;
       amNodeLabelExpression = info.getString("amNodeLabelExpression");
     }
@@ -1485,7 +1485,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
         app.getApplicationSubmissionContext().getNodeLabelExpression(),
         appNodeLabelExpression);
     assertEquals("unmanagedApplication doesn't match",
-        app.getAMResourceRequest().getNodeLabelExpression(),
+        app.getAMResourceRequests().get(0).getNodeLabelExpression(),
         amNodeLabelExpression);
     assertEquals("amRPCAddress",
         AppInfo.getAmRPCAddressFromRMAppAttempt(app.getCurrentAppAttempt()),
@@ -1512,7 +1512,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
       String nodeLabelExpression, int numContainers, boolean relaxLocality,
       int priority, String resourceName, long memory, long vCores,
       String executionType, boolean enforceExecutionType) {
-    ResourceRequest request = app.getAMResourceRequest();
+    ResourceRequest request = app.getAMResourceRequests().get(0);
     assertEquals("nodeLabelExpression doesn't match",
         request.getNodeLabelExpression(), nodeLabelExpression);
     assertEquals("numContainers doesn't match", request.getNumContainers(),