Browse Source

MAPREDUCE-6304. Specifying node labels when submitting MR jobs. (Naganarasimha G R via wangda)

Wangda Tan 10 years ago
parent
commit
3164e7d838

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -270,6 +270,9 @@ Release 2.8.0 - UNRELEASED
    MAPREDUCE-6364. Add a "Kill" link to Task Attempts page. (Ryu Kobayashi
    via ozawa)
 
+   MAPREDUCE-6304. Specifying node labels when submitting MR jobs.
+   (Naganarasimha G R via wangda)
+
   IMPROVEMENTS
 
     MAPREDUCE-6291. Correct mapred queue usage command.

+ 15 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -180,6 +180,10 @@ public class RMContainerAllocator extends RMContainerRequestor
 
   private ScheduleStats scheduleStats = new ScheduleStats();
 
+  private String mapNodeLabelExpression;
+
+  private String reduceNodeLabelExpression;
+
   public RMContainerAllocator(ClientService clientService, AppContext context,
       AMPreemptionPolicy preemptionPolicy) {
     super(clientService, context);
@@ -210,6 +214,8 @@ public class RMContainerAllocator extends RMContainerRequestor
     RackResolver.init(conf);
     retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
                                 MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+    mapNodeLabelExpression = conf.get(MRJobConfig.MAP_NODE_LABEL_EXP);
+    reduceNodeLabelExpression = conf.get(MRJobConfig.REDUCE_NODE_LABEL_EXP);
     // Init startTime to current time. If all goes well, it will be reset after
     // first attempt to contact RM.
     retrystartTime = System.currentTimeMillis();
@@ -396,9 +402,11 @@ public class RMContainerAllocator extends RMContainerRequestor
           reduceResourceRequest.getVirtualCores());
         if (reqEvent.getEarlierAttemptFailed()) {
           //add to the front of queue for fail fast
-          pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
+          pendingReduces.addFirst(new ContainerRequest(reqEvent,
+              PRIORITY_REDUCE, reduceNodeLabelExpression));
         } else {
-          pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
+          pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE,
+              reduceNodeLabelExpression));
           //reduces are added to pending and are slowly ramped up
         }
       }
@@ -951,7 +959,9 @@ public class RMContainerAllocator extends RMContainerRequestor
       
       if (event.getEarlierAttemptFailed()) {
         earlierFailedMaps.add(event.getAttemptID());
-        request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
+        request =
+            new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP,
+                mapNodeLabelExpression);
         LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
       } else {
         for (String host : event.getHosts()) {
@@ -976,7 +986,8 @@ public class RMContainerAllocator extends RMContainerRequestor
             LOG.debug("Added attempt req to rack " + rack);
          }
        }
-       request = new ContainerRequest(event, PRIORITY_MAP);
+        request =
+            new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression);
       }
       maps.put(event.getAttemptID(), request);
       addContainerReq(request);

+ 20 - 12
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java

@@ -121,39 +121,43 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     final String[] racks;
     //final boolean earlierAttemptFailed;
     final Priority priority;
+    final String nodeLabelExpression;
+
     /**
      * the time when this request object was formed; can be used to avoid
      * aggressive preemption for recently placed requests
      */
     final long requestTimeMs;
 
-    public ContainerRequest(ContainerRequestEvent event, Priority priority) {
+    public ContainerRequest(ContainerRequestEvent event, Priority priority,
+        String nodeLabelExpression) {
       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
-          event.getRacks(), priority);
+          event.getRacks(), priority, nodeLabelExpression);
     }
 
     public ContainerRequest(ContainerRequestEvent event, Priority priority,
                             long requestTimeMs) {
       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
-          event.getRacks(), priority, requestTimeMs);
+          event.getRacks(), priority, requestTimeMs,null);
     }
 
     public ContainerRequest(TaskAttemptId attemptID,
                             Resource capability, String[] hosts, String[] racks,
-                            Priority priority) {
+                            Priority priority, String nodeLabelExpression) {
       this(attemptID, capability, hosts, racks, priority,
-          System.currentTimeMillis());
+          System.currentTimeMillis(), nodeLabelExpression);
     }
 
     public ContainerRequest(TaskAttemptId attemptID,
         Resource capability, String[] hosts, String[] racks,
-        Priority priority, long requestTimeMs) {
+        Priority priority, long requestTimeMs,String nodeLabelExpression) {
       this.attemptID = attemptID;
       this.capability = capability;
       this.hosts = hosts;
       this.racks = racks;
       this.priority = priority;
       this.requestTimeMs = requestTimeMs;
+      this.nodeLabelExpression = nodeLabelExpression;
     }
     
     public String toString() {
@@ -390,17 +394,20 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     for (String host : req.hosts) {
       // Data-local
       if (!isNodeBlacklisted(host)) {
-        addResourceRequest(req.priority, host, req.capability);
-      }      
+        addResourceRequest(req.priority, host, req.capability,
+            null);
+      }
     }
 
     // Nothing Rack-local for now
     for (String rack : req.racks) {
-      addResourceRequest(req.priority, rack, req.capability);
+      addResourceRequest(req.priority, rack, req.capability,
+          null);
     }
 
     // Off-switch
-    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
+    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
+        req.nodeLabelExpression);
   }
 
   protected void decContainerReq(ContainerRequest req) {
@@ -417,7 +424,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
   }
 
   private void addResourceRequest(Priority priority, String resourceName,
-      Resource capability) {
+      Resource capability, String nodeLabelExpression) {
     Map<String, Map<Resource, ResourceRequest>> remoteRequests =
       this.remoteRequestsTable.get(priority);
     if (remoteRequests == null) {
@@ -439,6 +446,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
       remoteRequest.setResourceName(resourceName);
       remoteRequest.setCapability(capability);
       remoteRequest.setNumContainers(0);
+      remoteRequest.setNodeLabelExpression(nodeLabelExpression);
       reqMap.put(capability, remoteRequest);
     }
     remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
@@ -533,7 +541,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     }
     String[] hosts = newHosts.toArray(new String[newHosts.size()]);
     ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability,
-        hosts, orig.racks, orig.priority); 
+        hosts, orig.racks, orig.priority, orig.nodeLabelExpression);
     return newReq;
   }
   

+ 88 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java

@@ -79,6 +79,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -491,7 +492,7 @@ public class TestRMContainerAllocator {
     ContainerRequestEvent event1 =
         createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
     scheduledRequests.maps.put(mock(TaskAttemptId.class),
-        new RMContainerRequestor.ContainerRequest(event1, null));
+        new RMContainerRequestor.ContainerRequest(event1, null,null));
     assignedRequests.reduces.put(mock(TaskAttemptId.class),
         mock(Container.class));
 
@@ -561,6 +562,91 @@ public class TestRMContainerAllocator {
         assignedRequests.preemptionWaitingReduces.size());
   }
 
+  @Test
+  public void testMapReduceAllocationWithNodeLabelExpression() throws Exception {
+
+    LOG.info("Running testMapReduceAllocationWithNodeLabelExpression");
+    Configuration conf = new Configuration();
+    /*
+     * final int MAP_LIMIT = 3; final int REDUCE_LIMIT = 1;
+     * conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
+     * conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
+     */
+    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
+    conf.set(MRJobConfig.MAP_NODE_LABEL_EXP, "MapNodes");
+    conf.set(MRJobConfig.REDUCE_NODE_LABEL_EXP, "ReduceNodes");
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
+    MyContainerAllocator allocator =
+        new MyContainerAllocator(null, conf, appAttemptId, mockJob) {
+          @Override
+          protected void register() {
+          }
+
+          @Override
+          protected ApplicationMasterProtocol createSchedulerProxy() {
+            return mockScheduler;
+          }
+        };
+
+    // create some map requests
+    ContainerRequestEvent reqMapEvents;
+    reqMapEvents = createReq(jobId, 0, 1024, new String[] { "map" });
+    allocator.sendRequests(Arrays.asList(reqMapEvents));
+
+    // create some reduce requests
+    ContainerRequestEvent reqReduceEvents;
+    reqReduceEvents =
+        createReq(jobId, 0, 2048, new String[] { "reduce" }, false, true);
+    allocator.sendRequests(Arrays.asList(reqReduceEvents));
+    allocator.schedule();
+    // verify all of the host-specific asks were sent plus one for the
+    // default rack and one for the ANY request
+    Assert.assertEquals(3, mockScheduler.lastAsk.size());
+    // verify ResourceRequest sent for MAP have appropriate node
+    // label expression as per the configuration
+    validateLabelsRequests(mockScheduler.lastAsk.get(0), false);
+    validateLabelsRequests(mockScheduler.lastAsk.get(1), false);
+    validateLabelsRequests(mockScheduler.lastAsk.get(2), false);
+
+    // assign a map task and verify we do not ask for any more maps
+    ContainerId cid0 = mockScheduler.assignContainer("map", false);
+    allocator.schedule();
+    // default rack and one for the ANY request
+    Assert.assertEquals(3, mockScheduler.lastAsk.size());
+    validateLabelsRequests(mockScheduler.lastAsk.get(0), true);
+    validateLabelsRequests(mockScheduler.lastAsk.get(1), true);
+    validateLabelsRequests(mockScheduler.lastAsk.get(2), true);
+
+    // complete the map task and verify that we ask for one more
+    allocator.close();
+  }
+
+  private void validateLabelsRequests(ResourceRequest resourceRequest,
+      boolean isReduce) {
+    switch (resourceRequest.getResourceName()) {
+    case "map":
+    case "reduce":
+    case NetworkTopology.DEFAULT_RACK:
+      Assert.assertNull(resourceRequest.getNodeLabelExpression());
+      break;
+    case "*":
+      Assert.assertEquals(isReduce ? "ReduceNodes" : "MapNodes",
+          resourceRequest.getNodeLabelExpression());
+      break;
+    default:
+      Assert.fail("Invalid resource location "
+          + resourceRequest.getResourceName());
+    }
+  }
+
   @Test
   public void testMapReduceScheduling() throws Exception {
 
@@ -1498,6 +1584,7 @@ public class TestRMContainerAllocator {
             .getNumContainers(), req.getRelaxLocality());
         askCopy.add(reqCopy);
       }
+      SecurityUtil.setTokenServiceUseIp(false);
       lastAsk = ask;
       lastRelease = release;
       lastBlacklistAdditions = blacklistAdditions;

+ 20 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -71,6 +71,26 @@ public interface MRJobConfig {
 
   public static final String QUEUE_NAME = "mapreduce.job.queuename";
 
+  /**
+   *  Node Label expression applicable for all Job containers.
+   */
+  public static final String JOB_NODE_LABEL_EXP = "mapreduce.job.node-label-expression";
+
+  /**
+   * Node Label expression applicable for AM containers.
+   */
+  public static final String AM_NODE_LABEL_EXP = "mapreduce.job.am.node-label-expression";
+
+  /**
+   *  Node Label expression applicable for map containers.
+   */
+  public static final String MAP_NODE_LABEL_EXP = "mapreduce.map.node-label-expression";
+
+  /**
+   * Node Label expression applicable for reduce containers.
+   */
+  public static final String REDUCE_NODE_LABEL_EXP = "mapreduce.reduce.node-label-expression";
+
   public static final String RESERVATION_ID = "mapreduce.job.reservation.id";
 
   public static final String JOB_TAGS = "mapreduce.job.tags";

+ 35 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -1112,6 +1112,41 @@
 
 <!-- MR YARN Application properties -->
 
+<property>
+ <name>mapreduce.job.node-label-expression</name>
+  <description>All the containers of the Map Reduce job will be run with this
+  node label expression. If the node-label-expression for job is not set, then
+  it will use queue's default-node-label-expression for all job's containers.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.job.am.node-label-expression</name>
+  <description>This is node-label configuration for Map Reduce Application Master
+  container. If not configured it will make use of
+  mapreduce.job.node-label-expression and if job's node-label expression is not
+  configured then it will use queue's default-node-label-expression.
+  </description>
+</property>
+
+<property>
+ <name>mapreduce.map.node-label-expression</name>
+  <description>This is node-label configuration for Map task containers. If not
+  configured it will use mapreduce.job.node-label-expression and if job's
+  node-label expression is not configured then it will use queue's
+  default-node-label-expression.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.reduce.node-label-expression</name>
+  <description>This is node-label configuration for Reduce task containers. If
+  not configured it will use mapreduce.job.node-label-expression and if job's
+  node-label expression is not configured then it will use queue's
+  default-node-label-expression.
+  </description>
+</property>
+
 <property>
  <name>mapreduce.job.counters.limit</name>
   <value>120</value>

+ 29 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -76,8 +76,10 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -97,7 +99,15 @@ public class YARNRunner implements ClientProtocol {
 
   private static final Log LOG = LogFactory.getLog(YARNRunner.class);
 
-  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private final static RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  public final static Priority AM_CONTAINER_PRIORITY = recordFactory
+      .newRecordInstance(Priority.class);
+  static {
+    AM_CONTAINER_PRIORITY.setPriority(0);
+  }
+
   private ResourceMgrDelegate resMgrDelegate;
   private ClientCache clientCache;
   private Configuration conf;
@@ -525,6 +535,24 @@ public class YARNRunner implements ClientProtocol {
         conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
             MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
     appContext.setResource(capability);
+
+    // set labels for the AM container request if present
+    String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP);
+    if (null != amNodelabelExpression
+        && amNodelabelExpression.trim().length() != 0) {
+      ResourceRequest amResourceRequest =
+          recordFactory.newRecordInstance(ResourceRequest.class);
+      amResourceRequest.setPriority(AM_CONTAINER_PRIORITY);
+      amResourceRequest.setResourceName(ResourceRequest.ANY);
+      amResourceRequest.setCapability(capability);
+      amResourceRequest.setNumContainers(1);
+      amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
+      appContext.setAMContainerResourceRequest(amResourceRequest);
+    }
+    // set labels for the Job containers
+    appContext.setNodeLabelExpression(jobConf
+        .get(JobContext.JOB_NODE_LABEL_EXP));
+
     appContext.setApplicationType(MRJobConfig.MR_APPLICATION_TYPE);
     if (tagsFromConf != null && !tagsFromConf.isEmpty()) {
       appContext.setApplicationTags(new HashSet<String>(tagsFromConf));

+ 16 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java

@@ -546,6 +546,22 @@ public class TestYARNRunner extends TestCase {
     throw new IllegalStateException("Profiler opts not found!");
   }
 
+  @Test
+  public void testNodeLabelExp() throws Exception {
+    JobConf jobConf = new JobConf();
+
+    jobConf.set(MRJobConfig.JOB_NODE_LABEL_EXP, "GPU");
+    jobConf.set(MRJobConfig.AM_NODE_LABEL_EXP, "highMem");
+
+    YARNRunner yarnRunner = new YARNRunner(jobConf);
+    ApplicationSubmissionContext appSubCtx =
+        buildSubmitContext(yarnRunner, jobConf);
+
+    assertEquals(appSubCtx.getNodeLabelExpression(), "GPU");
+    assertEquals(appSubCtx.getAMContainerResourceRequest()
+        .getNodeLabelExpression(), "highMem");
+  }
+
   @Test
   public void testAMStandardEnv() throws Exception {
     final String ADMIN_LIB_PATH = "foo";