浏览代码

YARN-2501. Enhanced AMRMClient library to support requests against node labels. Contributed by Wangda Tan.

(cherry picked from commit a5ec3d080978a67837946a991843a081ea712539)
Vinod Kumar Vavilapalli 10 年之前
父节点
当前提交
8dacf89b3e

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -96,6 +96,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2494. Added NodeLabels Manager internal API and implementation. (Wangda
     Tan via vinodkv)
 
+    YARN-2501. Enhanced AMRMClient library to support requests against node
+    labels. (Wangda Tan via vinodkv)
+
   IMPROVEMENTS
 
     YARN-2242. Improve exception information on AM launch crashes. (Li Lu 

+ 34 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java

@@ -105,6 +105,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     final List<String> racks;
     final Priority priority;
     final boolean relaxLocality;
+    final String nodeLabelsExpression;
     
     /**
      * Instantiates a {@link ContainerRequest} with the given constraints and
@@ -124,9 +125,9 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
      */
     public ContainerRequest(Resource capability, String[] nodes,
         String[] racks, Priority priority) {
-      this(capability, nodes, racks, priority, true);
+      this(capability, nodes, racks, priority, true, null);
     }
-          
+    
     /**
      * Instantiates a {@link ContainerRequest} with the given constraints.
      * 
@@ -147,6 +148,32 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
      */
     public ContainerRequest(Resource capability, String[] nodes,
         String[] racks, Priority priority, boolean relaxLocality) {
+      this(capability, nodes, racks, priority, relaxLocality, null);
+    }
+          
+    /**
+     * Instantiates a {@link ContainerRequest} with the given constraints.
+     * 
+     * @param capability
+     *          The {@link Resource} to be requested for each container.
+     * @param nodes
+     *          Any hosts to request that the containers are placed on.
+     * @param racks
+     *          Any racks to request that the containers are placed on. The
+     *          racks corresponding to any hosts requested will be automatically
+     *          added to this list.
+     * @param priority
+     *          The priority at which to request the containers. Higher
+     *          priorities have lower numerical values.
+     * @param relaxLocality
+     *          If true, containers for this request may be assigned on hosts
+     *          and racks other than the ones explicitly requested.
+     * @param nodeLabelsExpression
+     *          Set node labels to allocate resource
+     */
+    public ContainerRequest(Resource capability, String[] nodes,
+        String[] racks, Priority priority, boolean relaxLocality,
+        String nodeLabelsExpression) {
       // Validate request
       Preconditions.checkArgument(capability != null,
           "The Resource to be requested for each container " +
@@ -163,6 +190,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
       this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
       this.priority = priority;
       this.relaxLocality = relaxLocality;
+      this.nodeLabelsExpression = nodeLabelsExpression;
     }
     
     public Resource getCapability() {
@@ -185,6 +213,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
       return relaxLocality;
     }
     
+    public String getNodeLabelExpression() {
+      return nodeLabelsExpression;
+    }
+    
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("Capability[").append(capability).append("]");

+ 11 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java

@@ -251,7 +251,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
           // RPC layer is using it to send info across
           askList.add(ResourceRequest.newInstance(r.getPriority(),
               r.getResourceName(), r.getCapability(), r.getNumContainers(),
-              r.getRelaxLocality()));
+              r.getRelaxLocality(), r.getNodeLabelExpression()));
         }
         releaseList = new ArrayList<ContainerId>(release);
         // optimistically clear this collection assuming no RPC failure
@@ -436,25 +436,25 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       }
       for (String node : dedupedNodes) {
         addResourceRequest(req.getPriority(), node, req.getCapability(), req,
-            true);
+            true, req.getNodeLabelExpression());
       }
     }
 
     for (String rack : dedupedRacks) {
       addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
-          true);
+          true, req.getNodeLabelExpression());
     }
 
     // Ensure node requests are accompanied by requests for
     // corresponding rack
     for (String rack : inferredRacks) {
       addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
-          req.getRelaxLocality());
+          req.getRelaxLocality(), req.getNodeLabelExpression());
     }
 
     // Off-switch
     addResourceRequest(req.getPriority(), ResourceRequest.ANY, 
-                    req.getCapability(), req, req.getRelaxLocality());
+        req.getCapability(), req, req.getRelaxLocality(), req.getNodeLabelExpression());
   }
 
   @Override
@@ -608,8 +608,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     ask.add(remoteRequest);
   }
 
-  private void addResourceRequest(Priority priority, String resourceName,
-      Resource capability, T req, boolean relaxLocality) {
+  private void
+      addResourceRequest(Priority priority, String resourceName,
+          Resource capability, T req, boolean relaxLocality,
+          String labelExpression) {
     Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
       this.remoteRequestsTable.get(priority);
     if (remoteRequests == null) {
@@ -642,6 +644,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     if (relaxLocality) {
       resourceRequestInfo.containerRequests.add(req);
     }
+    
+    resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
 
     // Note this down for next interaction with ResourceManager
     addResourceRequestToAsk(resourceRequestInfo.remoteRequest);

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.client.api.impl;
 
 import com.google.common.base.Supplier;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -147,6 +148,7 @@ public class TestAMRMClient {
     racks = new String[]{ rack };
   }
   
+  @SuppressWarnings("deprecation")
   @Before
   public void startApp() throws Exception {
     // submit new app
@@ -667,6 +669,28 @@ public class TestAMRMClient {
       }
     }
   }
+  
+  @Test(timeout=30000)
+  public void testAskWithNodeLabels() {
+    AMRMClientImpl<ContainerRequest> client =
+        new AMRMClientImpl<ContainerRequest>();
+
+    // add x, y to ANY
+    client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
+        1), null, null, Priority.UNDEFINED, true, "x && y"));
+    Assert.assertEquals(1, client.ask.size());
+    Assert.assertEquals("x && y", client.ask.iterator().next()
+        .getNodeLabelExpression());
+
+    // add x, y and a, b to ANY, only a, b should be kept
+    client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
+        1), null, null, Priority.UNDEFINED, true, "x && y"));
+    client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
+        1), null, null, Priority.UNDEFINED, true, "a && b"));
+    Assert.assertEquals(1, client.ask.size());
+    Assert.assertEquals("a && b", client.ask.iterator().next()
+        .getNodeLabelExpression());
+  }
     
   private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)  
       throws YarnException, IOException {