Ver código fonte

YARN-752. In AMRMClient, automatically add corresponding rack requests for requested nodes. (sandyr via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1493599 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur 12 anos atrás
pai
commit
02f87683e3

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -372,6 +372,9 @@ Release 2.1.0-beta - UNRELEASED
     YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then
     YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then
     use them for authentication with NMs. (Omkar Vinit Joshi via vinodkv)
     use them for authentication with NMs. (Omkar Vinit Joshi via vinodkv)
 
 
+    YARN-752. In AMRMClient, automatically add corresponding rack requests for 
+    requested nodes. (sandyr via tucu)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     YARN-512. Log aggregation root directory check is more expensive than it
     YARN-512. Log aggregation root directory check is more expensive than it

+ 26 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java

@@ -42,23 +42,36 @@ import com.google.common.collect.ImmutableList;
 public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Service {
 public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Service {
 
 
   /**
   /**
-   * Object to represent container request for resources.
-   * Resources may be localized to nodes and racks.
-   * Resources may be assigned priorities.
-   * All getters return unmodifiable collections.
-   * Can ask for multiple containers of a given type.
+   * Object to represent container request for resources. Scheduler
+   * documentation should be consulted for the specifics of how the parameters
+   * are honored.
+   * All getters return immutable values.
+   * 
+   * @param capability
+   *    The {@link Resource} to be requested for each container.
+   * @param nodes
+   *    Any hosts to request that the containers are placed on.
+   * @param racks
+   *    Any racks to request that the containers are placed on. The racks
+   *    corresponding to any hosts requested will be automatically added to
+   *    this list.
+   * @param priority
+   *    The priority at which to request the containers. Higher priorities have
+   *    lower numerical values.
+   * @param containerCount
+   *    The number of containers to request.
    */
    */
   public static class ContainerRequest {
   public static class ContainerRequest {
     final Resource capability;
     final Resource capability;
-    final ImmutableList<String> hosts;
-    final ImmutableList<String> racks;
+    final List<String> nodes;
+    final List<String> racks;
     final Priority priority;
     final Priority priority;
     final int containerCount;
     final int containerCount;
         
         
-    public ContainerRequest(Resource capability, String[] hosts,
+    public ContainerRequest(Resource capability, String[] nodes,
         String[] racks, Priority priority, int containerCount) {
         String[] racks, Priority priority, int containerCount) {
       this.capability = capability;
       this.capability = capability;
-      this.hosts = (hosts != null ? ImmutableList.copyOf(hosts) : null);
+      this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
       this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
       this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
       this.priority = priority;
       this.priority = priority;
       this.containerCount = containerCount;
       this.containerCount = containerCount;
@@ -68,8 +81,8 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
       return capability;
       return capability;
     }
     }
     
     
-    public List<String> getHosts() {
-      return hosts;
+    public List<String> getNodes() {
+      return nodes;
     }
     }
     
     
     public List<String> getRacks() {
     public List<String> getRacks() {
@@ -103,9 +116,9 @@ public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Servi
    * AMRMClient can remove it from its internal store.
    * AMRMClient can remove it from its internal store.
    */
    */
   public static class StoredContainerRequest extends ContainerRequest {    
   public static class StoredContainerRequest extends ContainerRequest {    
-    public StoredContainerRequest(Resource capability, String[] hosts,
+    public StoredContainerRequest(Resource capability, String[] nodes,
         String[] racks, Priority priority) {
         String[] racks, Priority priority) {
-      super(capability, hosts, racks, priority, 1);
+      super(capability, nodes, racks, priority, 1);
     }
     }
   }
   }
   
   

+ 61 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java

@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.List;
@@ -64,6 +65,9 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.RackResolver;
+
+import com.google.common.base.Joiner;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 
 
@@ -139,7 +143,7 @@ public class AMRMClientImpl<T extends ContainerRequest>
   
   
   //Key -> Priority
   //Key -> Priority
   //Value -> Map
   //Value -> Map
-  //Key->ResourceName (e.g., hostname, rackname, *)
+  //Key->ResourceName (e.g., nodename, rackname, *)
   //Value->Map
   //Value->Map
   //Key->Resource Capability
   //Key->Resource Capability
   //Value->ResourceRequest
   //Value->ResourceRequest
@@ -160,6 +164,7 @@ public class AMRMClientImpl<T extends ContainerRequest>
 
 
   @Override
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
   protected void serviceInit(Configuration conf) throws Exception {
+    RackResolver.init(conf);
     super.serviceInit(conf);
     super.serviceInit(conf);
   }
   }
 
 
@@ -309,22 +314,37 @@ public class AMRMClientImpl<T extends ContainerRequest>
   
   
   @Override
   @Override
   public synchronized void addContainerRequest(T req) {
   public synchronized void addContainerRequest(T req) {
-    // Create resource requests
-    // add check for dup locations
-    if (req.hosts != null) {
-      for (String host : req.hosts) {
-        addResourceRequest(req.priority, host, req.capability,
-            req.containerCount, req);
+    Set<String> allRacks = new HashSet<String>();
+    if (req.racks != null) {
+      allRacks.addAll(req.racks);
+      if(req.racks.size() != allRacks.size()) {
+        Joiner joiner = Joiner.on(',');
+        LOG.warn("ContainerRequest has duplicate racks: "
+            + joiner.join(req.racks));
       }
       }
     }
     }
-
-    if (req.racks != null) {
-      for (String rack : req.racks) {
-        addResourceRequest(req.priority, rack, req.capability,
+    allRacks.addAll(resolveRacks(req.nodes));
+    
+    if (req.nodes != null) {
+      HashSet<String> dedupedNodes = new HashSet<String>(req.nodes);
+      if(dedupedNodes.size() != req.nodes.size()) {
+        Joiner joiner = Joiner.on(',');
+        LOG.warn("ContainerRequest has duplicate nodes: "
+            + joiner.join(req.nodes));        
+      }
+      for (String node : dedupedNodes) {
+        // Ensure node requests are accompanied by requests for
+        // corresponding rack
+        addResourceRequest(req.priority, node, req.capability,
             req.containerCount, req);
             req.containerCount, req);
       }
       }
     }
     }
 
 
+    for (String rack : allRacks) {
+      addResourceRequest(req.priority, rack, req.capability,
+          req.containerCount, req);
+    }
+
     // Off-switch
     // Off-switch
     addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
     addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
         req.containerCount, req);
         req.containerCount, req);
@@ -332,19 +352,23 @@ public class AMRMClientImpl<T extends ContainerRequest>
 
 
   @Override
   @Override
   public synchronized void removeContainerRequest(T req) {
   public synchronized void removeContainerRequest(T req) {
+    Set<String> allRacks = new HashSet<String>();
+    if (req.racks != null) {
+      allRacks.addAll(req.racks);
+    }
+    allRacks.addAll(resolveRacks(req.nodes));
+
     // Update resource requests
     // Update resource requests
-    if (req.hosts != null) {
-      for (String hostName : req.hosts) {
-        decResourceRequest(req.priority, hostName, req.capability,
+    if (req.nodes != null) {
+      for (String node : new HashSet<String>(req.nodes)) {
+        decResourceRequest(req.priority, node, req.capability,
             req.containerCount, req);
             req.containerCount, req);
       }
       }
     }
     }
 
 
-    if (req.racks != null) {
-      for (String rack : req.racks) {
-        decResourceRequest(req.priority, rack, req.capability,
-            req.containerCount, req);
-      }
+    for (String rack : allRacks) {
+      decResourceRequest(req.priority, rack, req.capability,
+          req.containerCount, req);
     }
     }
 
 
     decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
     decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
@@ -404,6 +428,24 @@ public class AMRMClientImpl<T extends ContainerRequest>
     return list;          
     return list;          
   }
   }
   
   
+  private Set<String> resolveRacks(List<String> nodes) {
+    Set<String> racks = new HashSet<String>();    
+    if (nodes != null) {
+      for (String node : nodes) {
+        // Ensure node requests are accompanied by requests for
+        // corresponding rack
+        String rack = RackResolver.resolve(node).getNetworkLocation();
+        if (rack == null) {
+          LOG.warn("Failed to resolve rack for node " + node + ".");
+        } else {
+          racks.add(rack);
+        }
+      }
+    }
+    
+    return racks;
+  }
+  
   private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
   private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
     // This code looks weird but is needed because of the following scenario.
     // This code looks weird but is needed because of the following scenario.
     // A ResourceRequest is removed from the remoteRequestTable. A 0 container 
     // A ResourceRequest is removed from the remoteRequestTable. A 0 container 

+ 45 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java

@@ -267,6 +267,51 @@ public class TestAMRMClient {
     assertTrue(matches.size() == 1);
     assertTrue(matches.size() == 1);
     assertTrue(matches.get(0).size() == matchSize);    
     assertTrue(matches.get(0).size() == matchSize);    
   }
   }
+  
+  @Test (timeout=60000)
+  public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException {
+    AMRMClientImpl<StoredContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
+      amClient.init(conf);
+      amClient.start();
+      amClient.registerApplicationMaster("Host", 10000, "");
+      
+      Resource capability = Resource.newInstance(1024, 2);
+
+      StoredContainerRequest storedContainer1 = 
+          new StoredContainerRequest(capability, nodes, null, priority);
+      amClient.addContainerRequest(storedContainer1);
+
+      // verify matching with original node and inferred rack
+      List<? extends Collection<StoredContainerRequest>> matches;
+      StoredContainerRequest storedRequest;
+      // exact match node
+      matches = amClient.getMatchingRequests(priority, node, capability);
+      verifyMatches(matches, 1);
+      storedRequest = matches.get(0).iterator().next();
+      assertTrue(storedContainer1 == storedRequest);
+      // inferred match rack
+      matches = amClient.getMatchingRequests(priority, rack, capability);
+      verifyMatches(matches, 1);
+      storedRequest = matches.get(0).iterator().next();
+      assertTrue(storedContainer1 == storedRequest);
+      
+      // inferred rack match no longer valid after request is removed
+      amClient.removeContainerRequest(storedContainer1);
+      matches = amClient.getMatchingRequests(priority, rack, capability);
+      assertTrue(matches.isEmpty());
+      
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
+
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
 
 
   @Test (timeout=60000)
   @Test (timeout=60000)
   public void testAMRMClientMatchStorage() throws YarnException, IOException {
   public void testAMRMClientMatchStorage() throws YarnException, IOException {

+ 80 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.junit.Test;
+
+import static org.apache.hadoop.yarn.client.AMRMClientImpl.ContainerRequest;
+import static org.junit.Assert.assertEquals;
+
+public class TestAMRMClientContainerRequest {
+  @Test
+  public void testFillInRacks() {
+    AMRMClientImpl<ContainerRequest> client = new AMRMClientImpl<ContainerRequest>(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0));
+    
+    Configuration conf = new Configuration();
+    conf.setClass(
+        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        MyResolver.class, DNSToSwitchMapping.class);
+    client.init(conf);
+ 
+    Resource capability = Resource.newInstance(1024, 1);
+    ContainerRequest request =
+        new ContainerRequest(capability, new String[] {"host1", "host2"},
+            new String[] {"/rack2"}, Priority.newInstance(1), 4);
+    client.addContainerRequest(request);
+    verifyResourceRequestLocation(client, request, "host1");
+    verifyResourceRequestLocation(client, request, "host2");
+    verifyResourceRequestLocation(client, request, "/rack1");
+    verifyResourceRequestLocation(client, request, "/rack2");
+    verifyResourceRequestLocation(client, request, ResourceRequest.ANY);
+  }
+  
+  private static class MyResolver implements DNSToSwitchMapping {
+
+    @Override
+    public List<String> resolve(List<String> names) {
+      return Arrays.asList("/rack1");
+    }
+
+    @Override
+    public void reloadCachedMappings() {}
+  }
+  
+  private void verifyResourceRequestLocation(
+      AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
+      String location) {
+    ResourceRequest ask =  client.remoteRequestsTable.get(request.priority)
+        .get(location).get(request.capability).remoteRequest;
+    assertEquals(location, ask.getResourceName());
+    assertEquals(request.getContainerCount(), ask.getNumContainers());
+  }
+}