Просмотр исходного кода

YARN-1008. MiniYARNCluster with multiple nodemanagers, all nodes have same key for allocations. (tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1517567 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur 11 лет назад
Родитель
Сommit
81e85bc8f6
21 измененных файлов с 156 добавлено и 37 удалено
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  4. 11 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
  5. 5 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  6. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
  7. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  8. 9 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
  9. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java
  10. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
  11. 9 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
  12. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  13. 6 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
  14. 7 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  15. 9 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
  16. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
  17. 1 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
  18. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
  19. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
  20. 50 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
  21. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -61,6 +61,9 @@ Release 2.1.1-beta - UNRELEASED
     YARN-1094. Fixed a blocker with RM restart code because of which RM crashes
     when try to recover an existing app. (vinodkv)
 
+    YARN-1008. MiniYARNCluster with multiple nodemanagers, all nodes have same 
+    key for allocations. (tucu)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -132,6 +132,15 @@ public class YarnConfiguration extends Configuration {
     RM_PREFIX + "scheduler.client.thread-count";
   public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50;
 
+  /** If the port should be included or not in the node name. The node name
+   * is used by the scheduler for resource requests allocation location 
+   * matching. Typically this is just the hostname, using the port is needed
+   * when using minicluster and specific NM are required.*/
+  public static final String RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME =
+      YARN_PREFIX + "scheduler.include-port-in-node-name";
+  public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = 
+      false;
+
   /**
    * Enable periodic monitor threads.
    * @see #RM_SCHEDULER_MONITOR_POLICIES

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -281,7 +281,7 @@ public class AppSchedulingInfo {
     // Update future requirements
     nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
     if (nodeLocalRequest.getNumContainers() == 0) {
-      this.requests.get(priority).remove(node.getHostName());
+      this.requests.get(priority).remove(node.getNodeName());
     }
 
     ResourceRequest rackLocalRequest = requests.get(priority).get(

+ 11 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 /**
  * Represents a YARN Cluster Node from the viewpoint of the scheduler.
@@ -30,10 +31,17 @@ import org.apache.hadoop.yarn.api.records.Resource;
 public abstract class SchedulerNode {
 
   /**
-   * Get hostname.
-   * @return hostname
+   * Get the name of the node for scheduling matching decisions.
+   * <p/>
+   * Typically this is the 'hostname' reported by the node, but it could be 
+   * configured to be 'hostname:port' reported by the node via the 
+   * {@link YarnConfiguration#RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} constant.
+   * The main usecase of this is Yarn minicluster to be able to differentiate
+   * node manager instances by their port number.
+   * 
+   * @return name of the node for scheduling matching decisions.
    */
-  public abstract String getHostName();
+  public abstract String getNodeName();
   
   /**
    * Get rackname.

+ 5 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -185,7 +185,8 @@ public class CapacityScheduler
   private boolean initialized = false;
 
   private ResourceCalculator calculator;
-  
+  private boolean usePortForNodeName;
+
   public CapacityScheduler() {}
 
   @Override
@@ -256,6 +257,7 @@ public class CapacityScheduler
       this.minimumAllocation = this.conf.getMinimumAllocation();
       this.maximumAllocation = this.conf.getMaximumAllocation();
       this.calculator = this.conf.getResourceCalculator();
+      this.usePortForNodeName = this.conf.getUsePortForNodeName();
 
       this.rmContext = rmContext;
       
@@ -759,7 +761,8 @@ public class CapacityScheduler
   }
 
   private synchronized void addNode(RMNode nodeManager) {
-    this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager));
+    this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
+        usePortForNodeName));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     root.updateClusterResource(clusterResource);
     ++numNodeManagers;

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -338,6 +338,11 @@ public class CapacitySchedulerConfiguration extends Configuration {
         this);
   }
 
+  public boolean getUsePortForNodeName() {
+    return getBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
+  }
+
   public void setResourceComparator(
       Class<? extends ResourceCalculator> resourceCalculatorClass) {
     setClass(

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -801,7 +801,7 @@ public class LeafQueue implements CSQueue {
   assignContainers(Resource clusterResource, FiCaSchedulerNode node) {
 
     if(LOG.isDebugEnabled()) {
-      LOG.debug("assignContainers: node=" + node.getHostName()
+      LOG.debug("assignContainers: node=" + node.getNodeName()
         + " #applications=" + activeApplications.size());
     }
     
@@ -1130,7 +1130,7 @@ public class LeafQueue implements CSQueue {
 
     // Data-local
     ResourceRequest nodeLocalResourceRequest =
-        application.getResourceRequest(priority, node.getHostName());
+        application.getResourceRequest(priority, node.getNodeName());
     if (nodeLocalResourceRequest != null) {
       assigned = 
           assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, 
@@ -1257,7 +1257,7 @@ public class LeafQueue implements CSQueue {
     if (type == NodeType.NODE_LOCAL) {
       // Now check if we need containers on this host...
       ResourceRequest nodeLocalRequest = 
-        application.getResourceRequest(priority, node.getHostName());
+        application.getResourceRequest(priority, node.getNodeName());
       if (nodeLocalRequest != null) {
         return nodeLocalRequest.getNumContainers() > 0;
       }
@@ -1302,7 +1302,7 @@ public class LeafQueue implements CSQueue {
       FiCaSchedulerApp application, Priority priority, 
       ResourceRequest request, NodeType type, RMContainer rmContainer) {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("assignContainers: node=" + node.getHostName()
+      LOG.debug("assignContainers: node=" + node.getNodeName()
         + " application=" + application.getApplicationId().getId()
         + " priority=" + priority.getPriority()
         + " request=" + request + " type=" + type);

+ 9 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java

@@ -59,11 +59,17 @@ public class FiCaSchedulerNode extends SchedulerNode {
     new HashMap<ContainerId, RMContainer>();
   
   private final RMNode rmNode;
+  private final String nodeName;
 
-  public FiCaSchedulerNode(RMNode node) {
+  public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) {
     this.rmNode = node;
     this.availableResource.setMemory(node.getTotalCapability().getMemory());
     this.availableResource.setVirtualCores(node.getTotalCapability().getVirtualCores());
+    if (usePortForNodeName) {
+      nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
+    } else {
+      nodeName = rmNode.getHostName();
+    }
   }
 
   public RMNode getRMNode() {
@@ -79,8 +85,8 @@ public class FiCaSchedulerNode extends SchedulerNode {
   }
 
   @Override
-  public String getHostName() {
-    return this.rmNode.getHostName();
+  public String getNodeName() {
+    return nodeName;
   }
 
   @Override

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerUtils.java

@@ -24,9 +24,9 @@ public class FiCaSchedulerUtils {
 
   public static  boolean isBlacklisted(FiCaSchedulerApp application,
       FiCaSchedulerNode node, Log LOG) {
-    if (application.isBlacklisted(node.getHostName())) {
+    if (application.isBlacklisted(node.getNodeName())) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping 'host' " + node.getHostName() + 
+        LOG.debug("Skipping 'host' " + node.getNodeName() + 
             " for " + application.getApplicationId() + 
             " since it has been blacklisted");
       }

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java

@@ -185,7 +185,7 @@ public class AppSchedulable extends Schedulable {
    */
   private void reserve(Priority priority, FSSchedulerNode node,
       Container container, boolean alreadyReserved) {
-    LOG.info("Making reservation: node=" + node.getHostName() +
+    LOG.info("Making reservation: node=" + node.getNodeName() +
                                  " app_id=" + app.getApplicationId());
     if (!alreadyReserved) {
       getMetrics().reserveResource(app.getUser(), container.getResource());
@@ -309,7 +309,7 @@ public class AppSchedulable extends Schedulable {
         ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
             node.getRackName());
         ResourceRequest localRequest = app.getResourceRequest(priority,
-            node.getHostName());
+            node.getNodeName());
         
         if (localRequest != null && !localRequest.getRelaxLocality()) {
           LOG.warn("Relax locality off is not supported on local request: "
@@ -369,7 +369,7 @@ public class AppSchedulable extends Schedulable {
   public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
     ResourceRequest anyRequest = app.getResourceRequest(prio, ResourceRequest.ANY);
     ResourceRequest rackRequest = app.getResourceRequest(prio, node.getRackName());
-    ResourceRequest nodeRequest = app.getResourceRequest(prio, node.getHostName());
+    ResourceRequest nodeRequest = app.getResourceRequest(prio, node.getNodeName());
 
     return
         // There must be outstanding requests at the given priority:

+ 9 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java

@@ -63,10 +63,16 @@ public class FSSchedulerNode extends SchedulerNode {
     new HashMap<ContainerId, RMContainer>();
   
   private final RMNode rmNode;
+  private final String nodeName;
 
-  public FSSchedulerNode(RMNode node) {
+  public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
     this.rmNode = node;
     this.availableResource = Resources.clone(node.getTotalCapability());
+    if (usePortForNodeName) {
+      nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
+    } else {
+      nodeName = rmNode.getHostName();
+    }
   }
 
   public RMNode getRMNode() {
@@ -82,8 +88,8 @@ public class FSSchedulerNode extends SchedulerNode {
   }
 
   @Override
-  public String getHostName() {
-    return rmNode.getHostName();
+  public String getNodeName() {
+    return nodeName;
   }
 
   @Override

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -35,7 +35,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -122,6 +121,7 @@ public class FairScheduler implements ResourceScheduler {
   private Resource incrAllocation;
   private QueueManager queueMgr;
   private Clock clock;
+  private boolean usePortForNodeName;
 
   private static final Log LOG = LogFactory.getLog(FairScheduler.class);
   
@@ -742,7 +742,7 @@ public class FairScheduler implements ResourceScheduler {
   }
 
   private synchronized void addNode(RMNode node) {
-    nodes.put(node.getNodeID(), new FSSchedulerNode(node));
+    nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName));
     Resources.addTo(clusterCapacity, node.getTotalCapability());
     updateRootQueueMetrics();
 
@@ -1056,7 +1056,8 @@ public class FairScheduler implements ResourceScheduler {
     sizeBasedWeight = this.conf.getSizeBasedWeight();
     preemptionInterval = this.conf.getPreemptionInterval();
     waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
-
+    usePortForNodeName = this.conf.getUsePortForNodeName();
+    
     if (!initialized) {
       rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
       this.rmContext = rmContext;

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java

@@ -166,7 +166,12 @@ public class FairSchedulerConfiguration extends Configuration {
   public int getWaitTimeBeforeKill() {
     return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
   }
-  
+
+  public boolean getUsePortForNodeName() {
+    return getBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
+  }
+
   /**
    * Parses a resource config value of a form like "1024", "1024 mb",
    * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed.

+ 7 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -111,6 +111,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
   private boolean initialized;
   private Resource minimumAllocation;
   private Resource maximumAllocation;
+  private boolean usePortForNodeName;
 
   private Map<ApplicationAttemptId, FiCaSchedulerApp> applications
       = new TreeMap<ApplicationAttemptId, FiCaSchedulerApp>();
@@ -233,6 +234,9 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
         Resources.createResource(conf.getInt(
             YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
             YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+      this.usePortForNodeName = conf.getBoolean(
+          YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, 
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
       this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
           conf);
       this.activeUsersManager = new ActiveUsersManager(metrics);
@@ -490,7 +494,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
       FiCaSchedulerApp application, Priority priority) {
     int assignedContainers = 0;
     ResourceRequest request = 
-      application.getResourceRequest(priority, node.getHostName());
+      application.getResourceRequest(priority, node.getNodeName());
     if (request != null) {
       // Don't allocate on this node if we don't need containers on this rack
       ResourceRequest rackRequest =
@@ -801,7 +805,8 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
   }
 
   private synchronized void addNode(RMNode nodeManager) {
-    this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager));
+    this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
+        usePortForNodeName));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
   }
 

+ 9 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -200,15 +200,14 @@ public class MockNodes {
   };
 
   private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) {
-    return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null);
+    return buildRMNode(rack, perNode, state, httpAddr, NODE_ID++, null, 123);
   }
 
   private static RMNode buildRMNode(int rack, final Resource perNode,
-      NodeState state, String httpAddr, int hostnum, String hostName) {
+      NodeState state, String httpAddr, int hostnum, String hostName, int port) {
     final String rackName = "rack"+ rack;
     final int nid = hostnum;
     final String nodeAddr = hostName + ":" + nid;
-    final int port = 123;
     if (hostName == null) {
       hostName = "host"+ nid;
     }
@@ -230,12 +229,17 @@ public class MockNodes {
   }
 
   public static RMNode newNodeInfo(int rack, final Resource perNode, int hostnum) {
-    return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null);
+    return buildRMNode(rack, perNode, null, "localhost:0", hostnum, null, 123);
   }
   
   public static RMNode newNodeInfo(int rack, final Resource perNode,
       int hostnum, String hostName) {
-    return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName);
+    return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, 123);
+  }
+
+  public static RMNode newNodeInfo(int rack, final Resource perNode,
+      int hostnum, String hostName, int port) {
+    return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, port);
   }
 
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java

@@ -101,7 +101,7 @@ public class NodeManager implements ContainerManagementProtocol {
     request.setNodeId(this.nodeId);
     resourceTrackerService.registerNodeManager(request);
     this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get(
-        this.nodeId));
+        this.nodeId), false);
    
     // Sanity check
     Assert.assertEquals(capability.getMemory(), 

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java

@@ -26,7 +26,6 @@ import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.util.HashMap;
@@ -126,7 +125,7 @@ public class TestChildQueueOrder {
           throw new Exception();
         } catch (Exception e) {
           LOG.info("FOOBAR q.assignContainers q=" + queue.getQueueName() + 
-              " alloc=" + allocation + " node=" + node.getHostName());
+              " alloc=" + allocation + " node=" + node.getNodeName());
         }
         final Resource allocatedResource = Resources.createResource(allocation);
         if (queue instanceof ParentQueue) {

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java

@@ -138,7 +138,7 @@ public class TestParentQueue {
           throw new Exception();
         } catch (Exception e) {
           LOG.info("FOOBAR q.assignContainers q=" + queue.getQueueName() + 
-              " alloc=" + allocation + " node=" + node.getHostName());
+              " alloc=" + allocation + " node=" + node.getNodeName());
         }
         final Resource allocatedResource = Resources.createResource(allocation);
         if (queue instanceof ParentQueue) {

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java

@@ -160,7 +160,7 @@ public class TestUtils {
     when(rmNode.getHostName()).thenReturn(host);
     when(rmNode.getRackName()).thenReturn(rack);
     
-    FiCaSchedulerNode node = spy(new FiCaSchedulerNode(rmNode));
+    FiCaSchedulerNode node = spy(new FiCaSchedulerNode(rmNode, false));
     LOG.info("node = " + host + " avail=" + node.getAvailableResource());
     return node;
   }

+ 50 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -2125,4 +2125,54 @@ public class TestFairScheduler {
     Assert.assertEquals(2, app3.getLiveContainers().size());
     Assert.assertEquals(2, app4.getLiveContainers().size());
   }
+
+  @Test(timeout = 30000)
+  public void testHostPortNodeName() throws Exception {
+    scheduler.getConf().setBoolean(YarnConfiguration
+        .RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
+    scheduler.reinitialize(scheduler.getConf(), 
+        resourceManager.getRMContext());
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024),
+        1, "127.0.0.1", 1);
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024),
+        2, "127.0.0.1", 2);
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", 
+        "user1", 0);
+
+    ResourceRequest nodeRequest = createResourceRequest(1024, 
+        node1.getNodeID().getHost() + ":" + node1.getNodeID().getPort(), 1,
+        1, true);
+    ResourceRequest rackRequest = createResourceRequest(1024, 
+        node1.getRackName(), 1, 1, false);
+    ResourceRequest anyRequest = createResourceRequest(1024, 
+        ResourceRequest.ANY, 1, 1, false);
+    createSchedulingRequestExistingApplication(nodeRequest, attId1);
+    createSchedulingRequestExistingApplication(rackRequest, attId1);
+    createSchedulingRequestExistingApplication(anyRequest, attId1);
+
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent node1UpdateEvent = new 
+        NodeUpdateSchedulerEvent(node1);
+    NodeUpdateSchedulerEvent node2UpdateEvent = new 
+        NodeUpdateSchedulerEvent(node2);
+
+    // no matter how many heartbeats, node2 should never get a container  
+    FSSchedulerApp app = scheduler.applications.get(attId1);
+    for (int i = 0; i < 10; i++) {
+      scheduler.handle(node2UpdateEvent);
+      assertEquals(0, app.getLiveContainers().size());
+      assertEquals(0, app.getReservedContainers().size());
+    }
+    // then node1 should get the container  
+    scheduler.handle(node1UpdateEvent);
+    assertEquals(1, app.getLiveContainers().size());
+  }
+
 }

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

@@ -53,6 +53,21 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
 
+/**
+ * Embedded Yarn minicluster for testcases that need to interact with a cluster.
+ * <p/>
+ * In a real cluster, resource request matching is done using the hostname, and
+ * by default Yarn minicluster works in the exact same way as a real cluster.
+ * <p/>
+ * If a testcase needs to use multiple nodes and exercise resource request 
+ * matching to a specific node, then the property 
+ * {@YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} should be set
+ * <code>true</code> in the configuration used to initialize the minicluster.
+ * <p/>
+ * With this property set to <code>true</code>, the matching will be done using
+ * the <code>hostname:port</code> of the namenodes. In such case, the AM must
+ * do resource request using <code>hostname:port</code> as the location.
+ */
 public class MiniYARNCluster extends CompositeService {
 
   private static final Log LOG = LogFactory.getLog(MiniYARNCluster.class);