Browse Source

YARN-7258. Add Node and Rack Hints to Opportunistic Scheduler. (Kartheek Muthyala via asuresh).

(cherry picked from commit b733348dde18a242e6c9074c512116a8baf1d281)
Arun Suresh 7 years ago
parent
commit
cb91bb3c96
11 changed files with 998 additions and 737 deletions
  1. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
  2. 0 651
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
  3. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java
  4. 35 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
  5. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
  6. 281 53
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
  7. 38 30
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
  8. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  9. 599 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
  10. 6 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
  11. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java

@@ -222,6 +222,22 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
       return this;
     }
 
+    /**
+     * Set the <code>executionTypeRequest</code> of the request with 'ensure
+     * execution type' flag set to true.
+     * @see ResourceRequest#setExecutionTypeRequest(
+     * ExecutionTypeRequest)
+     * @param executionType <code>executionType</code> of the request.
+     * @return {@link ResourceRequestBuilder}
+     */
+    @Public
+    @Evolving
+    public ResourceRequestBuilder executionType(ExecutionType executionType) {
+      resourceRequest.setExecutionTypeRequest(
+          ExecutionTypeRequest.newInstance(executionType, true));
+      return this;
+    }
+
     /**
      * Set the <code>allocationRequestId</code> of the request.
      * @see ResourceRequest#setAllocationRequestId(long)

+ 0 - 651
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java

@@ -1,651 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.client.api.impl;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NMToken;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Validates End2End Distributed Scheduling flow which includes the AM
- * specifying OPPORTUNISTIC containers in its resource requests,
- * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor
- * on the NM and the DistributedSchedulingProtocol used by the framework to talk
- * to the OpportunisticContainerAllocatorAMService running on the RM.
- */
-public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
-
-  private static final Log LOG =
-      LogFactory.getLog(TestDistributedScheduling.class);
-
-  protected MiniYARNCluster cluster;
-  protected YarnClient rmClient;
-  protected ApplicationMasterProtocol client;
-  protected Configuration conf;
-  protected Configuration yarnConf;
-  protected ApplicationAttemptId attemptId;
-  protected ApplicationId appId;
-
-  @Before
-  public void doBefore() throws Exception {
-    cluster = new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1);
-
-    conf = new YarnConfiguration();
-    conf.setBoolean(YarnConfiguration.
-        OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
-    conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
-    conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
-        10);
-    cluster.init(conf);
-    cluster.start();
-    yarnConf = cluster.getConfig();
-
-    // the client has to connect to AMRMProxy
-    yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
-    rmClient = YarnClient.createYarnClient();
-    rmClient.init(yarnConf);
-    rmClient.start();
-
-    // Submit application
-    attemptId = createApp(rmClient, cluster, conf);
-    appId = attemptId.getApplicationId();
-    client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
-  }
-
-  @After
-  public void doAfter() throws Exception {
-    if (client != null) {
-      try {
-        client.finishApplicationMaster(FinishApplicationMasterRequest
-            .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
-        rmClient.killApplication(attemptId.getApplicationId());
-        attemptId = null;
-      } catch (Exception e) {
-      }
-    }
-    if (rmClient != null) {
-      try {
-        rmClient.stop();
-      } catch (Exception e) {
-      }
-    }
-    if (cluster != null) {
-      try {
-        cluster.stop();
-      } catch (Exception e) {
-      }
-    }
-  }
-
-
-  /**
-   * Validates if Allocate Requests containing only OPPORTUNISTIC container
-   * requests are satisfied instantly.
-   *
-   * @throws Exception
-   */
-  @Test(timeout = 60000)
-  public void testOpportunisticExecutionTypeRequestE2E() throws Exception {
-    LOG.info("testDistributedSchedulingE2E - Register");
-
-    RegisterApplicationMasterResponse responseRegister =
-        client.registerApplicationMaster(RegisterApplicationMasterRequest
-            .newInstance(NetUtils.getHostname(), 1024, ""));
-
-    Assert.assertNotNull(responseRegister);
-    Assert.assertNotNull(responseRegister.getQueue());
-    Assert.assertNotNull(responseRegister.getApplicationACLs());
-    Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
-    Assert
-        .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
-    Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
-    Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
-
-    // Wait until the RM has been updated and verify
-    Map<ApplicationId, RMApp> rmApps =
-        cluster.getResourceManager().getRMContext().getRMApps();
-    boolean rmUpdated = false;
-    for (int i=0; i<10 && !rmUpdated; i++) {
-      sleep(100);
-      RMApp rmApp = rmApps.get(appId);
-      if (rmApp.getState() == RMAppState.RUNNING) {
-        rmUpdated = true;
-      }
-    }
-    RMApp rmApp = rmApps.get(appId);
-    Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
-
-    LOG.info("testDistributedSchedulingE2E - Allocate");
-
-    AllocateRequest request =
-        createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
-
-    // Replace 'ANY' requests with OPPORTUNISTIC aks and remove
-    // everything else
-    List<ResourceRequest> newAskList = new ArrayList<>();
-    for (ResourceRequest rr : request.getAskList()) {
-      if (ResourceRequest.ANY.equals(rr.getResourceName())) {
-        ResourceRequest newRR = ResourceRequest.newInstance(rr
-                .getPriority(), rr.getResourceName(),
-            rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
-            rr.getNodeLabelExpression(),
-            ExecutionTypeRequest.newInstance(
-                ExecutionType.OPPORTUNISTIC, true));
-        newAskList.add(newRR);
-      }
-    }
-    request.setAskList(newAskList);
-
-    AllocateResponse allocResponse = client.allocate(request);
-    Assert.assertNotNull(allocResponse);
-
-    // Ensure that all the requests are satisfied immediately
-    Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
-    // Verify that the allocated containers are OPPORTUNISTIC
-    for (Container allocatedContainer : allocResponse
-        .getAllocatedContainers()) {
-      ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
-          .newContainerTokenIdentifier(
-              allocatedContainer.getContainerToken());
-      Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
-          containerTokenIdentifier.getExecutionType());
-    }
-
-    // Check that the RM sees OPPORTUNISTIC containers
-    ResourceScheduler scheduler = cluster.getResourceManager()
-        .getResourceScheduler();
-    for (Container allocatedContainer : allocResponse
-        .getAllocatedContainers()) {
-      ContainerId containerId = allocatedContainer.getId();
-      RMContainer rmContainer = scheduler.getRMContainer(containerId);
-      Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
-          rmContainer.getExecutionType());
-    }
-
-    LOG.info("testDistributedSchedulingE2E - Finish");
-  }
-
-  /**
-   * Validates if Allocate Requests containing both GUARANTEED and OPPORTUNISTIC
-   * container requests works as expected.
-   *
-   * @throws Exception
-   */
-  @Test(timeout = 60000)
-  public void testMixedExecutionTypeRequestE2E() throws Exception {
-    LOG.info("testDistributedSchedulingE2E - Register");
-
-    RegisterApplicationMasterResponse responseRegister =
-        client.registerApplicationMaster(RegisterApplicationMasterRequest
-            .newInstance(NetUtils.getHostname(), 1024, ""));
-
-    Assert.assertNotNull(responseRegister);
-    Assert.assertNotNull(responseRegister.getQueue());
-    Assert.assertNotNull(responseRegister.getApplicationACLs());
-    Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey());
-    Assert
-        .assertNotNull(responseRegister.getContainersFromPreviousAttempts());
-    Assert.assertNotNull(responseRegister.getSchedulerResourceTypes());
-    Assert.assertNotNull(responseRegister.getMaximumResourceCapability());
-
-    RMApp rmApp =
-        cluster.getResourceManager().getRMContext().getRMApps().get(appId);
-    Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
-
-    LOG.info("testDistributedSchedulingE2E - Allocate");
-
-    AllocateRequest request =
-        createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING));
-    List<ResourceRequest> askList = request.getAskList();
-    List<ResourceRequest> newAskList = new ArrayList<>(askList);
-
-    // Duplicate all ANY requests marking them as opportunistic
-    for (ResourceRequest rr : askList) {
-      if (ResourceRequest.ANY.equals(rr.getResourceName())) {
-        ResourceRequest newRR = ResourceRequest.newInstance(rr
-                .getPriority(), rr.getResourceName(),
-            rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
-            rr.getNodeLabelExpression(),
-            ExecutionTypeRequest.newInstance(
-                ExecutionType.OPPORTUNISTIC, true));
-        newAskList.add(newRR);
-      }
-    }
-    request.setAskList(newAskList);
-
-    AllocateResponse allocResponse = client.allocate(request);
-    Assert.assertNotNull(allocResponse);
-
-    // Ensure that all the requests are satisfied immediately
-    Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
-    // Verify that the allocated containers are OPPORTUNISTIC
-    for (Container allocatedContainer : allocResponse
-        .getAllocatedContainers()) {
-      ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
-          .newContainerTokenIdentifier(
-              allocatedContainer.getContainerToken());
-      Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
-          containerTokenIdentifier.getExecutionType());
-    }
-
-    request.setAskList(new ArrayList<ResourceRequest>());
-    request.setResponseId(request.getResponseId() + 1);
-
-    Thread.sleep(1000);
-
-    // RM should allocate GUARANTEED containers within 2 calls to allocate()
-    allocResponse = client.allocate(request);
-    Assert.assertNotNull(allocResponse);
-    Assert.assertEquals(2, allocResponse.getAllocatedContainers().size());
-
-    // Verify that the allocated containers are GUARANTEED
-    for (Container allocatedContainer : allocResponse
-        .getAllocatedContainers()) {
-      ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
-          .newContainerTokenIdentifier(
-              allocatedContainer.getContainerToken());
-      Assert.assertEquals(ExecutionType.GUARANTEED,
-          containerTokenIdentifier.getExecutionType());
-    }
-
-    LOG.info("testDistributedSchedulingE2E - Finish");
-  }
-
-  /**
-   * Validates if AMRMClient can be used with Distributed Scheduling turned on.
-   *
-   * @throws Exception
-   */
-  @Test(timeout = 120000)
-  @SuppressWarnings("unchecked")
-  public void testAMRMClient() throws Exception {
-    AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null;
-    try {
-      Priority priority = Priority.newInstance(1);
-      Priority priority2 = Priority.newInstance(2);
-      Resource capability = Resource.newInstance(1024, 1);
-
-      List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
-      String node = nodeReports.get(0).getNodeId().getHost();
-      String rack = nodeReports.get(0).getRackName();
-      String[] nodes = new String[]{node};
-      String[] racks = new String[]{rack};
-
-      // start am rm client
-      amClient = new AMRMClientImpl(client);
-      amClient.init(yarnConf);
-      amClient.start();
-      amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
-
-      assertEquals(0, amClient.ask.size());
-      assertEquals(0, amClient.release.size());
-
-      amClient.addContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.addContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.addContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.addContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.addContainerRequest(
-          new AMRMClient.ContainerRequest(capability, null, null, priority2,
-              0, true, null,
-              ExecutionTypeRequest.newInstance(
-                  ExecutionType.OPPORTUNISTIC, true)));
-      amClient.addContainerRequest(
-          new AMRMClient.ContainerRequest(capability, null, null, priority2,
-              0, true, null,
-              ExecutionTypeRequest.newInstance(
-                  ExecutionType.OPPORTUNISTIC, true)));
-
-      amClient.removeContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.removeContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.removeContainerRequest(
-          new AMRMClient.ContainerRequest(capability, null, null, priority2,
-              0, true, null,
-              ExecutionTypeRequest.newInstance(
-                  ExecutionType.OPPORTUNISTIC, true)));
-
-      RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
-          amClient.getTable(0);
-      int containersRequestedNode = remoteRequestsTable.get(priority,
-          node, ExecutionType.GUARANTEED, capability).remoteRequest
-          .getNumContainers();
-      int containersRequestedRack = remoteRequestsTable.get(priority,
-          rack, ExecutionType.GUARANTEED, capability).remoteRequest
-          .getNumContainers();
-      int containersRequestedAny = remoteRequestsTable.get(priority,
-          ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
-          .remoteRequest.getNumContainers();
-      int oppContainersRequestedAny =
-          remoteRequestsTable.get(priority2, ResourceRequest.ANY,
-              ExecutionType.OPPORTUNISTIC, capability).remoteRequest
-              .getNumContainers();
-
-      assertEquals(2, containersRequestedNode);
-      assertEquals(2, containersRequestedRack);
-      assertEquals(2, containersRequestedAny);
-      assertEquals(1, oppContainersRequestedAny);
-
-      assertEquals(4, amClient.ask.size());
-      assertEquals(0, amClient.release.size());
-
-      // RM should allocate container within 2 calls to allocate()
-      int allocatedContainerCount = 0;
-      int iterationsLeft = 10;
-      Set<ContainerId> releases = new TreeSet<>();
-
-      amClient.getNMTokenCache().clearCache();
-      Assert.assertEquals(0,
-          amClient.getNMTokenCache().numberOfTokensInCache());
-      HashMap<String, Token> receivedNMTokens = new HashMap<>();
-
-      while (allocatedContainerCount <
-          (containersRequestedAny + oppContainersRequestedAny)
-          && iterationsLeft-- > 0) {
-        AllocateResponse allocResponse = amClient.allocate(0.1f);
-        assertEquals(0, amClient.ask.size());
-        assertEquals(0, amClient.release.size());
-
-        allocatedContainerCount += allocResponse.getAllocatedContainers()
-            .size();
-        for (Container container : allocResponse.getAllocatedContainers()) {
-          ContainerId rejectContainerId = container.getId();
-          releases.add(rejectContainerId);
-        }
-
-        for (NMToken token : allocResponse.getNMTokens()) {
-          String nodeID = token.getNodeId().toString();
-          receivedNMTokens.put(nodeID, token.getToken());
-        }
-
-        if (allocatedContainerCount < containersRequestedAny) {
-          // sleep to let NM's heartbeat to RM and trigger allocations
-          sleep(100);
-        }
-      }
-
-      assertEquals(allocatedContainerCount,
-          containersRequestedAny + oppContainersRequestedAny);
-      for (ContainerId rejectContainerId : releases) {
-        amClient.releaseAssignedContainer(rejectContainerId);
-      }
-      assertEquals(3, amClient.release.size());
-      assertEquals(0, amClient.ask.size());
-
-      // need to tell the AMRMClient that we dont need these resources anymore
-      amClient.removeContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.removeContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
-      amClient.removeContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
-              0, true, null,
-              ExecutionTypeRequest.newInstance(
-                  ExecutionType.OPPORTUNISTIC, true)));
-      assertEquals(4, amClient.ask.size());
-
-      // test RPC exception handling
-      amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
-          nodes, racks, priority));
-      amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability,
-          nodes, racks, priority));
-      amClient.addContainerRequest(
-          new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
-              0, true, null,
-              ExecutionTypeRequest.newInstance(
-                  ExecutionType.OPPORTUNISTIC, true)));
-
-      final AMRMClient amc = amClient;
-      ApplicationMasterProtocol realRM = amClient.rmClient;
-      try {
-        ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol
-            .class);
-        final Resource _capability = capability;
-        final String[] _nodes = nodes;
-        final String[] _racks = racks;
-        final Priority _priority = priority;
-        final Priority _priority2 = priority2;
-
-        when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer(
-            new Answer<AllocateResponse>() {
-              public AllocateResponse answer(InvocationOnMock invocation)
-                  throws Exception {
-                amc.removeContainerRequest(
-                    new AMRMClient.ContainerRequest(_capability, _nodes,
-                        _racks, _priority));
-                amc.removeContainerRequest(
-                    new AMRMClient.ContainerRequest(_capability, _nodes, _racks,
-                        _priority));
-                amc.removeContainerRequest(
-                    new AMRMClient.ContainerRequest(_capability, null, null,
-                        _priority2, 0, true, null,
-                        ExecutionTypeRequest.newInstance(
-                            ExecutionType.OPPORTUNISTIC, true)));
-                throw new Exception();
-              }
-            });
-        amClient.rmClient = mockRM;
-        amClient.allocate(0.1f);
-      } catch (Exception ioe) {
-      } finally {
-        amClient.rmClient = realRM;
-      }
-
-      assertEquals(3, amClient.release.size());
-      assertEquals(6, amClient.ask.size());
-
-      iterationsLeft = 3;
-      // do a few iterations to ensure RM is not going send new containers
-      while (iterationsLeft-- > 0) {
-        // inform RM of rejection
-        AllocateResponse allocResponse = amClient.allocate(0.1f);
-        // RM did not send new containers because AM does not need any
-        assertEquals(0, allocResponse.getAllocatedContainers().size());
-        if (allocResponse.getCompletedContainersStatuses().size() > 0) {
-          for (ContainerStatus cStatus : allocResponse
-              .getCompletedContainersStatuses()) {
-            if (releases.contains(cStatus.getContainerId())) {
-              assertEquals(cStatus.getState(), ContainerState.COMPLETE);
-              assertEquals(-100, cStatus.getExitStatus());
-              releases.remove(cStatus.getContainerId());
-            }
-          }
-        }
-        if (iterationsLeft > 0) {
-          // sleep to make sure NM's heartbeat
-          sleep(100);
-        }
-      }
-      assertEquals(0, amClient.ask.size());
-      assertEquals(0, amClient.release.size());
-
-      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-          null, null);
-
-    } finally {
-      if (amClient != null && amClient.getServiceState() == Service.STATE
-          .STARTED) {
-        amClient.stop();
-      }
-    }
-  }
-
-  /**
-   * Check if an AM can ask for opportunistic containers and get them.
-   * @throws Exception
-   */
-  @Test
-  public void testAMOpportunistic() throws Exception {
-    // Basic container to request
-    Resource capability = Resource.newInstance(1024, 1);
-    Priority priority = Priority.newInstance(1);
-
-    // Get the cluster topology
-    List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING);
-    String node = nodeReports.get(0).getNodeId().getHost();
-    String rack = nodeReports.get(0).getRackName();
-    String[] nodes = new String[]{node};
-    String[] racks = new String[]{rack};
-
-    // Create an AM to request resources
-    AMRMClient<AMRMClient.ContainerRequest> amClient = null;
-    try {
-      amClient = new AMRMClientImpl<AMRMClient.ContainerRequest>(client);
-      amClient.init(yarnConf);
-      amClient.start();
-      amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, "");
-
-      // AM requests an opportunistic container
-      ExecutionTypeRequest execTypeRequest =
-          ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true);
-      ContainerRequest containerRequest = new AMRMClient.ContainerRequest(
-          capability, nodes, racks, priority, 0, true, null, execTypeRequest);
-      amClient.addContainerRequest(containerRequest);
-
-      // Wait until the container is allocated
-      ContainerId opportunisticContainerId = null;
-      for (int i=0; i<10 && opportunisticContainerId == null; i++) {
-        AllocateResponse allocResponse = amClient.allocate(0.1f);
-        List<Container> allocatedContainers =
-            allocResponse.getAllocatedContainers();
-        for (Container allocatedContainer : allocatedContainers) {
-          // Check that this is the container we required
-          assertEquals(ExecutionType.OPPORTUNISTIC,
-              allocatedContainer.getExecutionType());
-          opportunisticContainerId = allocatedContainer.getId();
-        }
-        sleep(100);
-      }
-      assertNotNull(opportunisticContainerId);
-
-      // The RM sees the container as OPPORTUNISTIC
-      ResourceScheduler scheduler = cluster.getResourceManager()
-          .getResourceScheduler();
-      RMContainer rmContainer = scheduler.getRMContainer(
-          opportunisticContainerId);
-      assertEquals(ExecutionType.OPPORTUNISTIC,
-          rmContainer.getExecutionType());
-
-      // Release the opportunistic container
-      amClient.releaseAssignedContainer(opportunisticContainerId);
-      // Wait for the release container to appear
-      boolean released = false;
-      for (int i=0; i<10 && !released; i++) {
-        AllocateResponse allocResponse = amClient.allocate(0.1f);
-        List<ContainerStatus> completedContainers =
-            allocResponse.getCompletedContainersStatuses();
-        for (ContainerStatus completedContainer : completedContainers) {
-          ContainerId completedContainerId =
-              completedContainer.getContainerId();
-          assertEquals(completedContainerId, opportunisticContainerId);
-          released = true;
-        }
-        if (!released) {
-          sleep(100);
-        }
-      }
-      assertTrue(released);
-
-      // The RM shouldn't see the container anymore
-      rmContainer = scheduler.getRMContainer(opportunisticContainerId);
-      assertNull(rmContainer);
-
-      // Clean the AM
-      amClient.unregisterApplicationMaster(
-          FinalApplicationStatus.SUCCEEDED, null, null);
-    } finally {
-      if (amClient != null &&
-          amClient.getServiceState() == Service.STATE.STARTED) {
-        amClient.close();
-      }
-    }
-  }
-
-  private void sleep(int sleepTime) {
-    try {
-      Thread.sleep(sleepTime);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  }
-}

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java

@@ -88,7 +88,7 @@ import static org.junit.Assert.assertNotNull;
  * Class that tests the allocation of OPPORTUNISTIC containers through the
  * centralized ResourceManager.
  */
-public class TestOpportunisticContainerAllocation {
+public class TestOpportunisticContainerAllocationE2E {
   private static Configuration conf = null;
   private static MiniYARNCluster yarnCluster = null;
   private static YarnClient yarnClient = null;

+ 35 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java

@@ -46,6 +46,24 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
     return remoteNode;
   }
 
+  /**
+   * Create new Instance.
+   * @param nodeId NodeId.
+   * @param httpAddress Http address.
+   * @param rackName Rack Name.
+   * @return RemoteNode instance.
+   */
+  @Private
+  @Unstable
+  public static RemoteNode newInstance(NodeId nodeId, String httpAddress,
+      String rackName) {
+    RemoteNode remoteNode = Records.newRecord(RemoteNode.class);
+    remoteNode.setNodeId(nodeId);
+    remoteNode.setHttpAddress(httpAddress);
+    remoteNode.setRackName(rackName);
+    return remoteNode;
+  }
+
   /**
    * Get {@link NodeId}.
    * @return NodeId.
@@ -78,6 +96,22 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
   @Unstable
   public abstract void setHttpAddress(String httpAddress);
 
+  /**
+   * Get Rack Name.
+   * @return Rack Name.
+   */
+  @Private
+  @Unstable
+  public abstract String getRackName();
+
+  /**
+   * Set Rack Name.
+   * @param rackName Rack Name.
+   */
+  @Private
+  @Unstable
+  public abstract void setRackName(String rackName);
+
   /**
    * Use the underlying {@link NodeId} comparator.
    * @param other RemoteNode.
@@ -92,6 +126,7 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
   public String toString() {
     return "RemoteNode{" +
         "nodeId=" + getNodeId() + ", " +
+        "rackName=" + getRackName() + ", " +
         "httpAddress=" + getHttpAddress() + "}";
   }
 }

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java

@@ -117,6 +117,25 @@ public class RemoteNodePBImpl extends RemoteNode {
     builder.setHttpAddress(httpAddress);
   }
 
+  @Override
+  public String getRackName() {
+    RemoteNodeProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasRackName()) {
+      return null;
+    }
+    return (p.getRackName());
+  }
+
+  @Override
+  public void setRackName(String rackName) {
+    maybeInitBuilder();
+    if (rackName == null) {
+      builder.clearRackName();
+      return;
+    }
+    builder.setRackName(rackName);
+  }
+
   @Override
   public int hashCode() {
     return getProto().hashCode();

+ 281 - 53
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java

@@ -45,11 +45,14 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -61,6 +64,10 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class OpportunisticContainerAllocator {
 
+  private static final int NODE_LOCAL_LOOP = 0;
+  private static final int RACK_LOCAL_LOOP = 1;
+  private static final int OFF_SWITCH_LOOP = 2;
+
   /**
    * This class encapsulates application specific parameters used to build a
    * Container.
@@ -70,6 +77,7 @@ public class OpportunisticContainerAllocator {
     private Resource minResource;
     private Resource incrementResource;
     private int containerTokenExpiryInterval;
+    private int maxAllocationsPerSchedulerKeyPerRound = 1;
 
     /**
      * Return Max Resource.
@@ -135,6 +143,24 @@ public class OpportunisticContainerAllocator {
         int containerTokenExpiryInterval) {
       this.containerTokenExpiryInterval = containerTokenExpiryInterval;
     }
+
+    /**
+     * Get the Max Allocations per Scheduler Key per allocation round.
+     * @return maxAllocationsPerSchedulerKeyPerRound.
+     */
+    public int getMaxAllocationsPerSchedulerKeyPerRound() {
+      return maxAllocationsPerSchedulerKeyPerRound;
+    }
+
+    /**
+     * Set the Max Allocations per Scheduler Key per allocation round.
+     * @param maxAllocationsPerSchedulerKeyPerRound val.
+     */
+    public void setMaxAllocationsPerSchedulerKeyPerRound(
+        int maxAllocationsPerSchedulerKeyPerRound) {
+      this.maxAllocationsPerSchedulerKeyPerRound =
+          maxAllocationsPerSchedulerKeyPerRound;
+    }
   }
 
   /**
@@ -188,6 +214,72 @@ public class OpportunisticContainerAllocator {
 
   private final BaseContainerTokenSecretManager tokenSecretManager;
 
+  static class Allocation {
+    private final Container container;
+    private final String resourceName;
+
+    Allocation(Container container, String resourceName) {
+      this.container = container;
+      this.resourceName = resourceName;
+    }
+
+    Container getContainer() {
+      return container;
+    }
+
+    String getResourceName() {
+      return resourceName;
+    }
+  }
+
+  static class EnrichedResourceRequest {
+    private final Map<String, AtomicInteger> nodeLocations = new HashMap<>();
+    private final Map<String, AtomicInteger> rackLocations = new HashMap<>();
+    private final ResourceRequest request;
+
+    EnrichedResourceRequest(ResourceRequest request) {
+      this.request = request;
+    }
+
+    ResourceRequest getRequest() {
+      return request;
+    }
+
+    void addLocation(String location, int count) {
+      Map<String, AtomicInteger> m = rackLocations;
+      if (!location.startsWith("/")) {
+        m = nodeLocations;
+      }
+      if (count == 0) {
+        m.remove(location);
+      } else {
+        m.put(location, new AtomicInteger(count));
+      }
+    }
+
+    void removeLocation(String location) {
+      Map<String, AtomicInteger> m = rackLocations;
+      AtomicInteger count = m.get(location);
+      if (count == null) {
+        m = nodeLocations;
+        count = m.get(location);
+      }
+
+      if (count != null) {
+        if (count.decrementAndGet() == 0) {
+          m.remove(location);
+        }
+      }
+    }
+
+    Set<String> getNodeLocations() {
+      return nodeLocations.keySet();
+    }
+
+    Set<String> getRackLocations() {
+      return rackLocations.keySet();
+    }
+  }
   /**
    * Create a new Opportunistic Container Allocator.
    * @param tokenSecretManager TokenSecretManager
@@ -223,37 +315,55 @@ public class OpportunisticContainerAllocator {
     // Add OPPORTUNISTIC requests to the outstanding ones.
     opportContext.addToOutstandingReqs(oppResourceReqs);
 
-    // Satisfy the outstanding OPPORTUNISTIC requests.
+    Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
     List<Container> allocatedContainers = new ArrayList<>();
-    for (SchedulerRequestKey schedulerKey :
-        opportContext.getOutstandingOpReqs().descendingKeySet()) {
-      // Allocated containers :
-      //  Key = Requested Capability,
-      //  Value = List of Containers of given cap (the actual container size
-      //          might be different than what is requested, which is why
-      //          we need the requested capability (key) to match against
-      //          the outstanding reqs)
-      Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
-          opportContext, schedulerKey, applicationAttemptId, appSubmitter);
-      for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
-        opportContext.matchAllocationToOutstandingRequest(
-            e.getKey(), e.getValue());
-        allocatedContainers.addAll(e.getValue());
+
+    // Satisfy the outstanding OPPORTUNISTIC requests.
+    boolean continueLoop = true;
+    while (continueLoop) {
+      continueLoop = false;
+      List<Map<Resource, List<Allocation>>> allocations = new ArrayList<>();
+      for (SchedulerRequestKey schedulerKey :
+          opportContext.getOutstandingOpReqs().descendingKeySet()) {
+        // Allocated containers :
+        //  Key = Requested Capability,
+        //  Value = List of Containers of given cap (the actual container size
+        //          might be different than what is requested, which is why
+        //          we need the requested capability (key) to match against
+        //          the outstanding reqs)
+        Map<Resource, List<Allocation>> allocation = allocate(
+            rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
+            appSubmitter, nodeBlackList);
+        if (allocation.size() > 0) {
+          allocations.add(allocation);
+          continueLoop = true;
+        }
+      }
+      for (Map<Resource, List<Allocation>> allocation : allocations) {
+        for (Map.Entry<Resource, List<Allocation>> e : allocation.entrySet()) {
+          opportContext.matchAllocationToOutstandingRequest(
+              e.getKey(), e.getValue());
+          for (Allocation alloc : e.getValue()) {
+            allocatedContainers.add(alloc.getContainer());
+          }
+        }
       }
     }
 
     return allocatedContainers;
   }
 
-  private Map<Resource, List<Container>> allocate(long rmIdentifier,
+  private Map<Resource, List<Allocation>> allocate(long rmIdentifier,
       OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
-      ApplicationAttemptId appAttId, String userName) throws YarnException {
-    Map<Resource, List<Container>> containers = new HashMap<>();
-    for (ResourceRequest anyAsk :
+      ApplicationAttemptId appAttId, String userName, Set<String> blackList)
+      throws YarnException {
+    Map<Resource, List<Allocation>> containers = new HashMap<>();
+    for (EnrichedResourceRequest enrichedAsk :
         appContext.getOutstandingOpReqs().get(schedKey).values()) {
       allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
-          appContext.getContainerIdGenerator(), appContext.getBlacklist(),
-          appAttId, appContext.getNodeMap(), userName, containers, anyAsk);
+          appContext.getContainerIdGenerator(), blackList, appAttId,
+          appContext.getNodeMap(), userName, containers, enrichedAsk);
+      ResourceRequest anyAsk = enrichedAsk.getRequest();
       if (!containers.isEmpty()) {
         LOG.info("Opportunistic allocation requested for ["
             + "priority=" + anyAsk.getPriority()
@@ -270,44 +380,162 @@ public class OpportunisticContainerAllocator {
       AllocationParams appParams, ContainerIdGenerator idCounter,
       Set<String> blacklist, ApplicationAttemptId id,
       Map<String, RemoteNode> allNodes, String userName,
-      Map<Resource, List<Container>> containers, ResourceRequest anyAsk)
+      Map<Resource, List<Allocation>> allocations,
+      EnrichedResourceRequest enrichedAsk)
       throws YarnException {
+    if (allNodes.size() == 0) {
+      LOG.info("No nodes currently available to " +
+          "allocate OPPORTUNISTIC containers.");
+      return;
+    }
+    ResourceRequest anyAsk = enrichedAsk.getRequest();
     int toAllocate = anyAsk.getNumContainers()
-        - (containers.isEmpty() ? 0 :
-            containers.get(anyAsk.getCapability()).size());
-
-    List<RemoteNode> nodesForScheduling = new ArrayList<>();
-    for (Entry<String, RemoteNode> nodeEntry : allNodes.entrySet()) {
-      // Do not use blacklisted nodes for scheduling.
-      if (blacklist.contains(nodeEntry.getKey())) {
-        continue;
+        - (allocations.isEmpty() ? 0 :
+            allocations.get(anyAsk.getCapability()).size());
+    toAllocate = Math.min(toAllocate,
+        appParams.getMaxAllocationsPerSchedulerKeyPerRound());
+    int numAllocated = 0;
+    // Node Candidates are selected as follows:
+    // * Node local candidates selected in loop == 0
+    // * Rack local candidates selected in loop == 1
+    // * From loop == 2 onwards, we revert to off switch allocations.
+    int loopIndex = OFF_SWITCH_LOOP;
+    if (enrichedAsk.getNodeLocations().size() > 0) {
+      loopIndex = NODE_LOCAL_LOOP;
+    }
+    while (numAllocated < toAllocate) {
+      Collection<RemoteNode> nodeCandidates =
+          findNodeCandidates(loopIndex, allNodes, blacklist, enrichedAsk);
+      for (RemoteNode rNode : nodeCandidates) {
+        String rNodeHost = rNode.getNodeId().getHost();
+        // Ignore black list
+        if (blacklist.contains(rNodeHost)) {
+          LOG.info("Nodes for scheduling has a blacklisted node" +
+              " [" + rNodeHost + "]..");
+          continue;
+        }
+        String location = ResourceRequest.ANY;
+        if (loopIndex == NODE_LOCAL_LOOP) {
+          if (enrichedAsk.getNodeLocations().contains(rNodeHost)) {
+            location = rNodeHost;
+          } else {
+            continue;
+          }
+        }
+        if (loopIndex == RACK_LOCAL_LOOP) {
+          if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) {
+            location = rNode.getRackName();
+          } else {
+            continue;
+          }
+        }
+        Container container = createContainer(rmIdentifier, appParams,
+            idCounter, id, userName, allocations, location,
+            anyAsk, rNode);
+        numAllocated++;
+        // Try to spread the allocations across the nodes.
+        // But don't add if it is a node local request.
+        if (loopIndex != NODE_LOCAL_LOOP) {
+          blacklist.add(rNode.getNodeId().getHost());
+        }
+        LOG.info("Allocated [" + container.getId() + "] as opportunistic at " +
+            "location [" + location + "]");
+        if (numAllocated >= toAllocate) {
+          break;
+        }
+      }
+      if (loopIndex == NODE_LOCAL_LOOP &&
+          enrichedAsk.getRackLocations().size() > 0) {
+        loopIndex = RACK_LOCAL_LOOP;
+      } else {
+        loopIndex++;
+      }
+      // Handle case where there are no nodes remaining after blacklist is
+      // considered.
+      if (loopIndex > OFF_SWITCH_LOOP && numAllocated == 0) {
+        LOG.warn("Unable to allocate any opportunistic containers.");
+        break;
       }
-      nodesForScheduling.add(nodeEntry.getValue());
     }
-    if (nodesForScheduling.isEmpty()) {
-      LOG.warn("No nodes available for allocating opportunistic containers. [" +
-          "allNodes=" + allNodes + ", " +
-          "blacklist=" + blacklist + "]");
-      return;
+  }
+
+  private Collection<RemoteNode> findNodeCandidates(int loopIndex,
+      Map<String, RemoteNode> allNodes, Set<String> blackList,
+      EnrichedResourceRequest enrichedRR) {
+    if (loopIndex > 1) {
+      return allNodes.values();
+    } else {
+      LinkedList<RemoteNode> retList = new LinkedList<>();
+      int numContainers = enrichedRR.getRequest().getNumContainers();
+      while (numContainers > 0) {
+        if (loopIndex == 0) {
+          // Node local candidates
+          numContainers = collectNodeLocalCandidates(
+              allNodes, enrichedRR, retList, numContainers);
+        } else {
+          // Rack local candidates
+          numContainers = collectRackLocalCandidates(
+              allNodes, enrichedRR, retList, blackList, numContainers);
+        }
+        if (numContainers == enrichedRR.getRequest().getNumContainers()) {
+          // If there is no change in numContainers, then there is no point
+          // in looping again.
+          break;
+        }
+      }
+      return retList;
     }
-    int numAllocated = 0;
-    int nextNodeToSchedule = 0;
-    for (int numCont = 0; numCont < toAllocate; numCont++) {
-      nextNodeToSchedule++;
-      nextNodeToSchedule %= nodesForScheduling.size();
-      RemoteNode node = nodesForScheduling.get(nextNodeToSchedule);
-      Container container = buildContainer(rmIdentifier, appParams, idCounter,
-          anyAsk, id, userName, node);
-      List<Container> cList = containers.get(anyAsk.getCapability());
-      if (cList == null) {
-        cList = new ArrayList<>();
-        containers.put(anyAsk.getCapability(), cList);
+  }
+
+  private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes,
+      EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList,
+      Set<String> blackList, int numContainers) {
+    for (RemoteNode rNode : allNodes.values()) {
+      if (enrichedRR.getRackLocations().contains(rNode.getRackName())) {
+        if (blackList.contains(rNode.getNodeId().getHost())) {
+          retList.addLast(rNode);
+        } else {
+          retList.addFirst(rNode);
+          numContainers--;
+        }
+      }
+      if (numContainers == 0) {
+        break;
+      }
+    }
+    return numContainers;
+  }
+
+  private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes,
+      EnrichedResourceRequest enrichedRR, List<RemoteNode> retList,
+      int numContainers) {
+    for (String nodeName : enrichedRR.getNodeLocations()) {
+      RemoteNode remoteNode = allNodes.get(nodeName);
+      if (remoteNode != null) {
+        retList.add(remoteNode);
+        numContainers--;
       }
-      cList.add(container);
-      numAllocated++;
-      LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
+      if (numContainers == 0) {
+        break;
+      }
+    }
+    return numContainers;
+  }
+
+  private Container createContainer(long rmIdentifier,
+      AllocationParams appParams, ContainerIdGenerator idCounter,
+      ApplicationAttemptId id, String userName,
+      Map<Resource, List<Allocation>> allocations, String location,
+      ResourceRequest anyAsk, RemoteNode rNode) throws YarnException {
+    Container container = buildContainer(rmIdentifier, appParams,
+        idCounter, anyAsk, id, userName, rNode);
+    List<Allocation> allocList = allocations.get(anyAsk.getCapability());
+    if (allocList == null) {
+      allocList = new ArrayList<>();
+      allocations.put(anyAsk.getCapability(), allocList);
     }
-    LOG.info("Allocated " + numAllocated + " opportunistic containers.");
+    allocList.add(new Allocation(container, location));
+    return container;
   }
 
   private Container buildContainer(long rmIdentifier,

+ 38 - 30
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.scheduler;
 
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
@@ -35,8 +34,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.Allocation;
 import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams;
 import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator;
+import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.EnrichedResourceRequest;
 
 /**
  * This encapsulates application specific information used by the
@@ -53,7 +54,8 @@ public class OpportunisticContainerContext {
       new ContainerIdGenerator();
 
   private volatile List<RemoteNode> nodeList = new LinkedList<>();
-  private final Map<String, RemoteNode> nodeMap = new LinkedHashMap<>();
+  private final LinkedHashMap<String, RemoteNode> nodeMap =
+      new LinkedHashMap<>();
 
   private final Set<String> blacklist = new HashSet<>();
 
@@ -61,7 +63,8 @@ public class OpportunisticContainerContext {
   // Resource Name (host/rack/any) and capability. This mapping is required
   // to match a received Container to an outstanding OPPORTUNISTIC
   // ResourceRequest (ask).
-  private final TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>>
+  private final TreeMap
+      <SchedulerRequestKey, Map<Resource, EnrichedResourceRequest>>
       outstandingOpReqs = new TreeMap<>();
 
   public AllocationParams getAppParams() {
@@ -107,7 +110,7 @@ public class OpportunisticContainerContext {
     return blacklist;
   }
 
-  public TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>>
+  public TreeMap<SchedulerRequestKey, Map<Resource, EnrichedResourceRequest>>
       getOutstandingOpReqs() {
     return outstandingOpReqs;
   }
@@ -125,36 +128,32 @@ public class OpportunisticContainerContext {
     for (ResourceRequest request : resourceAsks) {
       SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);
 
-      // TODO: Extend for Node/Rack locality. We only handle ANY requests now
-      if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
-        continue;
-      }
-
-      if (request.getNumContainers() == 0) {
-        continue;
-      }
-
-      Map<Resource, ResourceRequest> reqMap =
+      Map<Resource, EnrichedResourceRequest> reqMap =
           outstandingOpReqs.get(schedulerKey);
       if (reqMap == null) {
         reqMap = new HashMap<>();
         outstandingOpReqs.put(schedulerKey, reqMap);
       }
 
-      ResourceRequest resourceRequest = reqMap.get(request.getCapability());
-      if (resourceRequest == null) {
-        resourceRequest = request;
-        reqMap.put(request.getCapability(), request);
+      EnrichedResourceRequest eReq = reqMap.get(request.getCapability());
+      if (eReq == null) {
+        eReq = new EnrichedResourceRequest(request);
+        reqMap.put(request.getCapability(), eReq);
+      }
+      // Set numContainers only for ANY request
+      if (ResourceRequest.isAnyLocation(request.getResourceName())) {
+        eReq.getRequest().setResourceName(ResourceRequest.ANY);
+        eReq.getRequest().setNumContainers(request.getNumContainers());
       } else {
-        resourceRequest.setNumContainers(
-            resourceRequest.getNumContainers() + request.getNumContainers());
+        eReq.addLocation(request.getResourceName(), request.getNumContainers());
       }
       if (ResourceRequest.isAnyLocation(request.getResourceName())) {
         LOG.info("# of outstandingOpReqs in ANY (at "
             + "priority = " + schedulerKey.getPriority()
             + ", allocationReqId = " + schedulerKey.getAllocationRequestId()
             + ", with capability = " + request.getCapability() + " ) : "
-            + resourceRequest.getNumContainers());
+            + ", with location = " + request.getResourceName() + " ) : "
+            + ", numContainers = " + eReq.getRequest().getNumContainers());
       }
     }
   }
@@ -163,25 +162,34 @@ public class OpportunisticContainerContext {
    * This method matches a returned list of Container Allocations to any
    * outstanding OPPORTUNISTIC ResourceRequest.
    * @param capability Capability
-   * @param allocatedContainers Allocated Containers
+   * @param allocations Allocations.
    */
   public void matchAllocationToOutstandingRequest(Resource capability,
-      List<Container> allocatedContainers) {
-    for (Container c : allocatedContainers) {
+      List<Allocation> allocations) {
+    for (OpportunisticContainerAllocator.Allocation allocation : allocations) {
       SchedulerRequestKey schedulerKey =
-          SchedulerRequestKey.extractFrom(c);
-      Map<Resource, ResourceRequest> asks =
+          SchedulerRequestKey.extractFrom(allocation.getContainer());
+      Map<Resource, EnrichedResourceRequest> asks =
           outstandingOpReqs.get(schedulerKey);
 
       if (asks == null) {
         continue;
       }
 
-      ResourceRequest rr = asks.get(capability);
-      if (rr != null) {
-        rr.setNumContainers(rr.getNumContainers() - 1);
-        if (rr.getNumContainers() == 0) {
+      EnrichedResourceRequest err = asks.get(capability);
+      if (err != null) {
+        int numContainers = err.getRequest().getNumContainers();
+        numContainers--;
+        err.getRequest().setNumContainers(numContainers);
+        if (numContainers == 0) {
           asks.remove(capability);
+          if (asks.size() == 0) {
+            outstandingOpReqs.remove(schedulerKey);
+          }
+        } else {
+          if (!ResourceRequest.isAnyLocation(allocation.getResourceName())) {
+            err.removeLocation(allocation.getResourceName());
+          }
         }
       }
     }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -29,6 +29,7 @@ import "yarn_service_protos.proto";
 message RemoteNodeProto {
   optional NodeIdProto node_id = 1;
   optional string http_address = 2;
+  optional string rack_name = 3;
 }
 
 message RegisterDistributedSchedulingAMResponseProto {

+ 599 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java

@@ -0,0 +1,599 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.scheduler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class TestOpportunisticContainerAllocator {
+
+  private static final int GB = 1024;
+  private OpportunisticContainerAllocator allocator = null;
+  private OpportunisticContainerContext oppCntxt = null;
+
+  @Before
+  public void setup() {
+    SecurityUtil.setTokenServiceUseIp(false);
+    final MasterKey mKey = new MasterKey() {
+      @Override
+      public int getKeyId() {
+        return 1;
+      }
+      @Override
+      public void setKeyId(int keyId) {}
+      @Override
+      public ByteBuffer getBytes() {
+        return ByteBuffer.allocate(8);
+      }
+      @Override
+      public void setBytes(ByteBuffer bytes) {}
+    };
+    BaseContainerTokenSecretManager secMan =
+        new BaseContainerTokenSecretManager(new Configuration()) {
+          @Override
+          public MasterKey getCurrentKey() {
+            return mKey;
+          }
+
+          @Override
+          public byte[] createPassword(ContainerTokenIdentifier identifier) {
+            return new byte[]{1, 2};
+          }
+        };
+    allocator = new OpportunisticContainerAllocator(secMan);
+    oppCntxt = new OpportunisticContainerContext();
+    oppCntxt.getAppParams().setMinResource(Resource.newInstance(1024, 1));
+    oppCntxt.getAppParams().setIncrementResource(Resource.newInstance(512, 1));
+    oppCntxt.getAppParams().setMaxResource(Resource.newInstance(1024, 10));
+  }
+
+  @Test
+  public void testSimpleAllocation() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<String>(), new ArrayList<String>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+        "*", Resources.createResource(1 * GB), 1, true, null,
+        ExecutionTypeRequest.newInstance(
+            ExecutionType.OPPORTUNISTIC, true)));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h1", 1234), "h1:1234", "/r1")));
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    Assert.assertEquals(1, containers.size());
+    Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size());
+  }
+
+  @Test
+  public void testBlacklistRejection() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            Arrays.asList("h1", "h2"), new ArrayList<String>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+            "*", Resources.createResource(1 * GB), 1, true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r2")));
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    Assert.assertEquals(0, containers.size());
+    Assert.assertEquals(1, oppCntxt.getOutstandingOpReqs().size());
+  }
+
+  @Test
+  public void testRoundRobinSimpleAllocation() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<String>(), new ArrayList<String>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newBuilder().allocationRequestId(1)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(3)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build());
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r1")));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    System.out.println(containers);
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeHttpAddress());
+    }
+    Assert.assertTrue(allocatedHosts.contains("h1:1234"));
+    Assert.assertTrue(allocatedHosts.contains("h2:1234"));
+    Assert.assertTrue(allocatedHosts.contains("h3:1234"));
+    Assert.assertEquals(3, containers.size());
+  }
+
+  @Test
+  public void testNodeLocalAllocation() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<String>(), new ArrayList<String>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newBuilder().allocationRequestId(1)
+                .priority(Priority.newInstance(1))
+                .resourceName("/r1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(1)
+                .priority(Priority.newInstance(1))
+                .resourceName("h1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(1)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
+                .resourceName("/r1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
+                .resourceName("h1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build());
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r1")));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    System.out.println(containers);
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeHttpAddress());
+    }
+    Assert.assertEquals(2, containers.size());
+    Assert.assertTrue(allocatedHosts.contains("h1:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h2:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+  }
+
+  @Test
+  public void testNodeLocalAllocationSameSchedKey() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<String>(), new ArrayList<String>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .numContainers(2)
+                .priority(Priority.newInstance(1))
+                .resourceName("/r1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .numContainers(2)
+                .priority(Priority.newInstance(1))
+                .resourceName("h1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .numContainers(2)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build());
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r1")));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    System.out.println(containers);
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeHttpAddress());
+    }
+    Assert.assertEquals(2, containers.size());
+    Assert.assertTrue(allocatedHosts.contains("h1:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h2:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+  }
+
+  @Test
+  public void testSimpleRackLocalAllocation() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<String>(), new ArrayList<String>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newInstance(Priority.newInstance(1), "*",
+                Resources.createResource(1 * GB), 1, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "h1",
+                Resources.createResource(1 * GB), 1, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
+                Resources.createResource(1 * GB), 1, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeHttpAddress());
+    }
+    Assert.assertTrue(allocatedHosts.contains("h2:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h4:1234"));
+    Assert.assertEquals(1, containers.size());
+  }
+
+  @Test
+  public void testRoundRobinRackLocalAllocation() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<String>(), new ArrayList<String>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newBuilder().allocationRequestId(1)
+                .priority(Priority.newInstance(1))
+                .resourceName("/r1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(1)
+                .priority(Priority.newInstance(1))
+                .resourceName("h1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(1)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
+                .resourceName("/r1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
+                .resourceName("h1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build());
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeHttpAddress());
+    }
+    System.out.println(containers);
+    Assert.assertTrue(allocatedHosts.contains("h2:1234"));
+    Assert.assertTrue(allocatedHosts.contains("h5:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h4:1234"));
+    Assert.assertEquals(2, containers.size());
+  }
+
+  @Test
+  public void testRoundRobinRackLocalAllocationSameSchedKey() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<String>(), new ArrayList<String>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newInstance(Priority.newInstance(1), "*",
+                Resources.createResource(1 * GB), 2, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "h1",
+                Resources.createResource(1 * GB), 2, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
+                Resources.createResource(1 * GB), 2, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeHttpAddress());
+    }
+    System.out.println(containers);
+    Assert.assertTrue(allocatedHosts.contains("h2:1234"));
+    Assert.assertTrue(allocatedHosts.contains("h5:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h4:1234"));
+    Assert.assertEquals(2, containers.size());
+  }
+
+  @Test
+  public void testOffSwitchAllocationWhenNoNodeOrRack() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<String>(), new ArrayList<String>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newInstance(Priority.newInstance(1), "*",
+                Resources.createResource(1 * GB), 2, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "h6",
+                Resources.createResource(1 * GB), 2, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "/r3",
+                Resources.createResource(1 * GB), 2, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    System.out.println(containers);
+    Assert.assertEquals(2, containers.size());
+  }
+
+  @Test
+  public void testLotsOfContainersRackLocalAllocationSameSchedKey()
+      throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<String>(), new ArrayList<String>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newInstance(Priority.newInstance(1), "*",
+                Resources.createResource(1 * GB), 1000, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "h1",
+                Resources.createResource(1 * GB), 1000, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
+                Resources.createResource(1 * GB), 1000, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+    List<Container> containers = new ArrayList<>();
+    for (int i = 0; i < 250; i++) {
+      containers.addAll(allocator.allocateContainers(
+          blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"));
+    }
+    Assert.assertEquals(1000, containers.size());
+  }
+
+  @Test
+  public void testLotsOfContainersRackLocalAllocation()
+      throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<String>(), new ArrayList<String>());
+    List<ResourceRequest> reqs = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
+          .priority(Priority.newInstance(1))
+          .resourceName("*")
+          .capability(Resources.createResource(1 * GB))
+          .relaxLocality(true)
+          .executionType(ExecutionType.OPPORTUNISTIC).build());
+      reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
+          .priority(Priority.newInstance(1))
+          .resourceName("h1")
+          .capability(Resources.createResource(1 * GB))
+          .relaxLocality(true)
+          .executionType(ExecutionType.OPPORTUNISTIC).build());
+      reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
+          .priority(Priority.newInstance(1))
+          .resourceName("/r1")
+          .capability(Resources.createResource(1 * GB))
+          .relaxLocality(true)
+          .executionType(ExecutionType.OPPORTUNISTIC).build());
+    }
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+    List<Container> containers = new ArrayList<>();
+    for (int i = 0; i < 25; i++) {
+      containers.addAll(allocator.allocateContainers(
+          blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"));
+    }
+    Assert.assertEquals(100, containers.size());
+  }
+}

+ 6 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java

@@ -432,8 +432,12 @@ public class OpportunisticContainerAllocatorAMService
   private RemoteNode convertToRemoteNode(NodeId nodeId) {
     SchedulerNode node =
         ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId);
-    return node != null ? RemoteNode.newInstance(nodeId, node.getHttpAddress())
-        : null;
+    if (node != null) {
+      RemoteNode rNode = RemoteNode.newInstance(nodeId, node.getHttpAddress());
+      rNode.setRackName(node.getRackName());
+      return rNode;
+    }
+    return null;
   }
 
   private static ApplicationAttemptId getAppAttemptId() throws YarnException {

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java

@@ -610,6 +610,8 @@ public class TestOpportunisticContainerAllocatorAMService {
                 .newInstance(ExecutionType.OPPORTUNISTIC, true))), null);
     List<Container> allocatedContainers =
         allocateResponse.getAllocatedContainers();
+    allocatedContainers.addAll(
+        am1.allocate(null, null).getAllocatedContainers());
     Assert.assertEquals(2, allocatedContainers.size());
     Container container = allocatedContainers.get(0);
     // Start Container in NM