Browse Source

MAPREDUCE-6926. Allow MR jobs to opt out of oversubscription. Contributed by Haibo Chen.

Miklos Szegedi 7 years ago
parent
commit
8757aa90a7

+ 25 - 23
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java

@@ -111,6 +111,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
       .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
       .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
   private final Set<String> blacklistRemovals = Collections
   private final Set<String> blacklistRemovals = Collections
       .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
       .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+  private boolean optOutOfOversubscription;
 
 
   public RMContainerRequestor(ClientService clientService, AppContext context) {
   public RMContainerRequestor(ClientService clientService, AppContext context) {
     super(clientService, context);
     super(clientService, context);
@@ -136,22 +137,17 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     public ContainerRequest(ContainerRequestEvent event, Priority priority,
     public ContainerRequest(ContainerRequestEvent event, Priority priority,
         String nodeLabelExpression) {
         String nodeLabelExpression) {
       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
-          event.getRacks(), priority, nodeLabelExpression);
+          event.getRacks(), priority, System.currentTimeMillis(),
+          nodeLabelExpression);
     }
     }
 
 
+    @VisibleForTesting
     public ContainerRequest(ContainerRequestEvent event, Priority priority,
     public ContainerRequest(ContainerRequestEvent event, Priority priority,
                             long requestTimeMs) {
                             long requestTimeMs) {
       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
           event.getRacks(), priority, requestTimeMs,null);
           event.getRacks(), priority, requestTimeMs,null);
     }
     }
 
 
-    public ContainerRequest(TaskAttemptId attemptID,
-                            Resource capability, String[] hosts, String[] racks,
-                            Priority priority, String nodeLabelExpression) {
-      this(attemptID, capability, hosts, racks, priority,
-          System.currentTimeMillis(), nodeLabelExpression);
-    }
-
     public ContainerRequest(TaskAttemptId attemptID,
     public ContainerRequest(TaskAttemptId attemptID,
         Resource capability, String[] hosts, String[] racks,
         Resource capability, String[] hosts, String[] racks,
         Priority priority, long requestTimeMs,String nodeLabelExpression) {
         Priority priority, long requestTimeMs,String nodeLabelExpression) {
@@ -186,6 +182,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
             MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
             MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
             MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
             MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
     LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
     LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
+    optOutOfOversubscription = conf.getBoolean(
+        MRJobConfig.MR_OVERSUBSCRIPTION_OPT_OUT,
+        MRJobConfig.DEFAULT_MR_OVERSUBSCRIPTION_OPT_OUT);
+    LOG.info("optOutOfOversubscription is " + optOutOfOversubscription);
     if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
     if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
       throw new YarnRuntimeException("Invalid blacklistDisablePercent: "
       throw new YarnRuntimeException("Invalid blacklistDisablePercent: "
           + blacklistDisablePercent
           + blacklistDisablePercent
@@ -398,20 +398,20 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     for (String host : req.hosts) {
     for (String host : req.hosts) {
       // Data-local
       // Data-local
       if (!isNodeBlacklisted(host)) {
       if (!isNodeBlacklisted(host)) {
-        addResourceRequest(req.priority, host, req.capability,
+        addGuaranteedResourceRequest(req.priority, host, req.capability,
             null);
             null);
       }
       }
     }
     }
 
 
     // Nothing Rack-local for now
     // Nothing Rack-local for now
     for (String rack : req.racks) {
     for (String rack : req.racks) {
-      addResourceRequest(req.priority, rack, req.capability,
+      addGuaranteedResourceRequest(req.priority, rack, req.capability,
           null);
           null);
     }
     }
 
 
     // Off-switch
     // Off-switch
-    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
-        req.nodeLabelExpression);
+    addGuaranteedResourceRequest(req.priority, ResourceRequest.ANY,
+        req.capability, req.nodeLabelExpression);
   }
   }
 
 
   protected void decContainerReq(ContainerRequest req) {
   protected void decContainerReq(ContainerRequest req) {
@@ -430,18 +430,18 @@ public abstract class RMContainerRequestor extends RMCommunicator {
   protected void addOpportunisticResourceRequest(Priority priority,
   protected void addOpportunisticResourceRequest(Priority priority,
       Resource capability) {
       Resource capability) {
     addResourceRequest(priority, ResourceRequest.ANY, capability, null,
     addResourceRequest(priority, ResourceRequest.ANY, capability, null,
-        ExecutionType.OPPORTUNISTIC);
+        ExecutionType.OPPORTUNISTIC, true);
   }
   }
 
 
-  private void addResourceRequest(Priority priority, String resourceName,
-      Resource capability, String nodeLabelExpression) {
+  private void addGuaranteedResourceRequest(Priority priority,
+      String resourceName, Resource capability, String nodeLabelExpression) {
     addResourceRequest(priority, resourceName, capability, nodeLabelExpression,
     addResourceRequest(priority, resourceName, capability, nodeLabelExpression,
-        ExecutionType.GUARANTEED);
+        ExecutionType.GUARANTEED, optOutOfOversubscription);
   }
   }
 
 
   private void addResourceRequest(Priority priority, String resourceName,
   private void addResourceRequest(Priority priority, String resourceName,
       Resource capability, String nodeLabelExpression,
       Resource capability, String nodeLabelExpression,
-      ExecutionType executionType) {
+      ExecutionType executionType, boolean enforceExecutionType) {
     Map<String, Map<Resource, ResourceRequest>> remoteRequests =
     Map<String, Map<Resource, ResourceRequest>> remoteRequests =
       this.remoteRequestsTable.get(priority);
       this.remoteRequestsTable.get(priority);
     if (remoteRequests == null) {
     if (remoteRequests == null) {
@@ -464,8 +464,8 @@ public abstract class RMContainerRequestor extends RMCommunicator {
       remoteRequest.setCapability(capability);
       remoteRequest.setCapability(capability);
       remoteRequest.setNumContainers(0);
       remoteRequest.setNumContainers(0);
       remoteRequest.setNodeLabelExpression(nodeLabelExpression);
       remoteRequest.setNodeLabelExpression(nodeLabelExpression);
-      remoteRequest.setExecutionTypeRequest(
-          ExecutionTypeRequest.newInstance(executionType, true));
+      remoteRequest.setExecutionTypeRequest(ExecutionTypeRequest.
+          newInstance(executionType, enforceExecutionType));
       reqMap.put(capability, remoteRequest);
       reqMap.put(capability, remoteRequest);
     }
     }
     remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
     remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
@@ -473,9 +473,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     // Note this down for next interaction with ResourceManager
     // Note this down for next interaction with ResourceManager
     addResourceRequestToAsk(remoteRequest);
     addResourceRequestToAsk(remoteRequest);
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("addResourceRequest:" + " applicationId="
+      LOG.debug("addGuaranteedResourceRequest:" + " applicationId="
           + applicationId.getId() + " priority=" + priority.getPriority()
           + applicationId.getId() + " priority=" + priority.getPriority()
-          + " resourceName=" + resourceName + " numContainers="
+          + " resourceName=" + resourceName + " ExecutionType=" + executionType
+          + " enforceExecutionType=" + enforceExecutionType + " numContainers="
           + remoteRequest.getNumContainers() + " #asks=" + ask.size());
           + remoteRequest.getNumContainers() + " #asks=" + ask.size());
     }
     }
   }
   }
@@ -559,8 +560,9 @@ public abstract class RMContainerRequestor extends RMCommunicator {
       }
       }
     }
     }
     String[] hosts = newHosts.toArray(new String[newHosts.size()]);
     String[] hosts = newHosts.toArray(new String[newHosts.size()]);
-    ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability,
-        hosts, orig.racks, orig.priority, orig.nodeLabelExpression);
+    ContainerRequest newReq = new ContainerRequest(orig.attemptID,
+        orig.capability, hosts, orig.racks, orig.priority,
+        System.currentTimeMillis(), orig.nodeLabelExpression);
     return newReq;
     return newReq;
   }
   }
   
   

+ 192 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java

@@ -105,6 +105,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -868,6 +870,196 @@ public class TestRMContainerAllocator {
     allocator.close();
     allocator.close();
   }
   }
 
 
+  /**
+   * Test A MapReduce job can be configured to opt out of oversubscription,
+   * that is, it always wait for guaranteed resources to execute its tasks.
+   * This is done by setting up a MapReduce job with 2 mappers and 1 reducers
+   * and capturing all ResourceRequests sent from the AM to RM, then checking
+   * if all ResourceRequests are guaranteed and their enforceExecutionType is
+   * true.
+   */
+  @Test
+  public void testMapReduceOptingOutOversubscription() throws Exception {
+    List<ResourceRequest> resourceRequests = captureResourceRequests(true);
+
+    for(ResourceRequest resourceRequest : resourceRequests) {
+      ExecutionTypeRequest executionTypeRequest =
+          resourceRequest.getExecutionTypeRequest();
+      if (!executionTypeRequest.equals(ExecutionTypeRequest.newInstance(
+          ExecutionType.GUARANTEED, true))) {
+        Assert.fail("The execution type of ResourceRequest " + resourceRequest +
+            " is not guaranteed or not enforced.");
+      }
+    }
+  }
+
+  /**
+   * Test a MapReduce job can be configured to opt in oversubscription (
+   * true by default). This is done by setting up a MapReduce job with 2
+   * mappers and 1 reducers and capturing all ResourceRequests sent from
+   * the AM to RM, then checking if all ResourceRequests are guaranteed
+   * but their enforceExecutionType is always set to false.
+   */
+  @Test
+  public void testMapReduceOptingInOversubscription() throws Exception {
+    List<ResourceRequest> resourceRequests = captureResourceRequests(false);
+
+    for(ResourceRequest resourceRequest : resourceRequests) {
+      ExecutionTypeRequest executionTypeRequest =
+          resourceRequest.getExecutionTypeRequest();
+      if (!executionTypeRequest.equals(ExecutionTypeRequest.newInstance(
+          ExecutionType.GUARANTEED, false))) {
+        Assert.fail("The execution type of ResourceRequest " + resourceRequest +
+            " is not guaranteed or it is enforced.");
+      }
+    }
+  }
+
+  /**
+   * Set up a mapreduce job with 2 mappers and 1 reducer and return
+   * all ResourceRequests sent from the AM to RM.
+   */
+  private List<ResourceRequest> captureResourceRequests(
+      boolean optOutOfOversubscription) throws Exception {
+    List<ResourceRequest> resourceRequests = new ArrayList<>();
+
+    final Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_OVERSUBSCRIPTION_OPT_OUT,
+        optOutOfOversubscription);
+
+    // start the resource manager
+    final MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // submit an application
+    RMApp rmApp = rm.submitApp(1024);
+    rm.drainEvents();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 11264);
+    amNodeManager.nodeHeartbeat(true);
+    rm.drainEvents();
+
+    final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    rm.drainEvents();
+
+    // start the MR AM and wait until it is in running state
+    MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId(
+        appAttemptId, 0), 2, 1, false,
+        this.getClass().getName(), true, 1) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return new DrainDispatcher();
+      }
+      protected ContainerAllocator createContainerAllocator(
+          ClientService clientService, AppContext context) {
+        return new MyContainerAllocator(rm, appAttemptId, context);
+      };
+    };
+    mrApp.submit(conf);
+
+    Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next()
+        .getValue();
+    DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
+    MyContainerAllocator allocator = (MyContainerAllocator) mrApp
+        .getContainerAllocator();
+    mrApp.waitForInternalState((JobImpl)job, JobStateInternal.RUNNING);
+    amDispatcher.await();
+
+    // wait until all attempts request for containers
+    for (Task t : job.getTasks().values()) {
+      mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values()
+          .iterator().next(), TaskAttemptStateInternal.UNASSIGNED);
+    }
+    amDispatcher.await();
+
+    // send map resource requests to RM
+    allocator.schedule();
+    rm.drainEvents();
+    for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+      resourceRequests.add(ResourceRequest.newInstance(
+          rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+          rr.getNumContainers(), rr.getRelaxLocality(),
+          rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+    }
+
+    // wait for both map tasks to be running
+    amNodeManager.nodeHeartbeat(true);
+    rm.drainEvents();
+
+    allocator.schedule();
+    rm.drainEvents();
+    for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+      resourceRequests.add(ResourceRequest.newInstance(
+          rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+          rr.getNumContainers(), rr.getRelaxLocality(),
+          rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+    }
+
+    for (Task t : job.getTasks().values()) {
+      if (t.getType() == TaskType.MAP) {
+        mrApp.waitForState(t, TaskState.RUNNING);
+      }
+    }
+
+    // finish both map tasks so that the reduce task can be scheduled
+    Iterator<Task> it = job.getTasks().values().iterator();
+    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2);
+    allocator.schedule();
+    rm.drainEvents();
+    for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+      resourceRequests.add(ResourceRequest.newInstance(
+          rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+          rr.getNumContainers(), rr.getRelaxLocality(),
+          rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+    }
+
+    // send the reduce resource requests to RM
+    allocator.schedule();
+    rm.drainEvents();
+    for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+      resourceRequests.add(ResourceRequest.newInstance(
+          rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+          rr.getNumContainers(), rr.getRelaxLocality(),
+          rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+    }
+
+    // wait for the reduce task to be running
+    amNodeManager.nodeHeartbeat(true);
+    rm.drainEvents();
+
+    allocator.schedule();
+    rm.drainEvents();
+    for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+      resourceRequests.add(ResourceRequest.newInstance(
+          rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+          rr.getNumContainers(), rr.getRelaxLocality(),
+          rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+    }
+
+    for (Task t : job.getTasks().values()) {
+      if (t.getType() == TaskType.REDUCE) {
+        mrApp.waitForState(t, TaskState.RUNNING);
+      }
+    }
+
+    // finish the reduce task
+    finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1);
+    allocator.schedule();
+    rm.drainEvents();
+    for (ResourceRequest rr : rm.getMyFifoScheduler().lastAsk) {
+      resourceRequests.add(ResourceRequest.newInstance(
+          rr.getPriority(), rr.getResourceName(), rr.getCapability(),
+          rr.getNumContainers(), rr.getRelaxLocality(),
+          rr.getNodeLabelExpression(), rr.getExecutionTypeRequest()));
+    }
+
+    return resourceRequests;
+  }
+
   @Test
   @Test
   public void testMapReduceScheduling() throws Exception {
   public void testMapReduceScheduling() throws Exception {
 
 

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -1178,6 +1178,12 @@ public interface MRJobConfig {
       "mapreduce.job.num-opportunistic-maps-percent";
       "mapreduce.job.num-opportunistic-maps-percent";
   public static final int DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENT = 0;
   public static final int DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENT = 0;
 
 
+  /**
+   * Opt out of YARN oversubscription so that the job always waits for
+   * GUARANTEED resources available in the cluster.
+   */
+  String MR_OVERSUBSCRIPTION_OPT_OUT = "mapreduce.job.oversubscription-opt-out";
+  boolean DEFAULT_MR_OVERSUBSCRIPTION_OPT_OUT = false;
   /**
   /**
    * A comma-separated list of properties whose value will be redacted.
    * A comma-separated list of properties whose value will be redacted.
    */
    */

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -2151,4 +2151,12 @@
   </description>
   </description>
 </property>
 </property>
 
 
+<property>
+  <description>
+    Opts out of YARN oversubscription so that the job always waits for GUARANTEED
+    resources available.
+  </description>
+  <name>mapreduce.job.oversubscription-opt-out</name>
+  <value>false</value>
+</property>
 </configuration>
 </configuration>