Browse Source

MAPREDUCE-5279. Made MR headroom calculation honor cpu dimension when YARN scheduler resource type is memory plus cpu. Contributed by Peng Zhang and Varun Vasudev.

Zhijie Shen 10 năm trước cách đây
mục cha
commit
376233cdd4

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -254,6 +254,10 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5891. Improved shuffle error handling across NM restarts
     (Junping Du via jlowe)
 
+    MAPREDUCE-5279. Made MR headroom calculation honor cpu dimension when YARN
+    scheduler resource type is memory plus cpu. (Peng Zhang and Varun Vasudev
+    via zjshen)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 11 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 
 /**
  * Registers/unregisters to RM and sends heartbeats to RM.
@@ -90,6 +92,8 @@ public abstract class RMCommunicator extends AbstractService
   private volatile boolean shouldUnregister = true;
   private boolean isApplicationMasterRegistered = false;
 
+  private EnumSet<SchedulerResourceTypes> schedulerResourceTypes;
+
   public RMCommunicator(ClientService clientService, AppContext context) {
     super("RMCommunicator");
     this.clientService = clientService;
@@ -98,6 +102,7 @@ public abstract class RMCommunicator extends AbstractService
     this.applicationId = context.getApplicationID();
     this.stopped = new AtomicBoolean(false);
     this.heartbeatCallbacks = new ConcurrentLinkedQueue<Runnable>();
+    this.schedulerResourceTypes = EnumSet.of(SchedulerResourceTypes.MEMORY);
   }
 
   @Override
@@ -163,10 +168,11 @@ public abstract class RMCommunicator extends AbstractService
         setClientToAMToken(response.getClientToAMTokenMasterKey());        
       }
       this.applicationACLs = response.getApplicationACLs();
-      LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
+      LOG.info("maxContainerCapability: " + maxContainerCapability);
       String queue = response.getQueue();
       LOG.info("queue: " + queue);
       job.setQueueName(queue);
+      this.schedulerResourceTypes.addAll(response.getSchedulerResourceTypes());
     } catch (Exception are) {
       LOG.error("Exception while registering", are);
       throw new YarnRuntimeException(are);
@@ -343,4 +349,8 @@ public abstract class RMCommunicator extends AbstractService
   protected boolean isApplicationMasterRegistered() {
     return isApplicationMasterRegistered;
   }
+
+  public EnumSet<SchedulerResourceTypes> getSchedulerResourceTypes() {
+    return schedulerResourceTypes;
+  }
 }

+ 165 - 107
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -80,6 +81,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -149,8 +151,8 @@ public class RMContainerAllocator extends RMContainerRequestor
   private int lastCompletedTasks = 0;
   
   private boolean recalculateReduceSchedule = false;
-  private int mapResourceRequest;//memory
-  private int reduceResourceRequest;//memory
+  private Resource mapResourceRequest = Resources.none();
+  private Resource reduceResourceRequest = Resources.none();
   
   private boolean reduceStarted = false;
   private float maxReduceRampupLimit = 0;
@@ -328,49 +330,61 @@ public class RMContainerAllocator extends RMContainerRequestor
     if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
       ContainerRequestEvent reqEvent = (ContainerRequestEvent) event;
       JobId jobId = getJob().getID();
-      int supportedMaxContainerCapability =
-          getMaxContainerCapability().getMemory();
+      Resource supportedMaxContainerCapability = getMaxContainerCapability();
       if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
-        if (mapResourceRequest == 0) {
-          mapResourceRequest = reqEvent.getCapability().getMemory();
-          eventHandler.handle(new JobHistoryEvent(jobId, 
-              new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
-                  mapResourceRequest)));
-          LOG.info("mapResourceRequest:"+ mapResourceRequest);
-          if (mapResourceRequest > supportedMaxContainerCapability) {
-            String diagMsg = "MAP capability required is more than the supported " +
-            "max container capability in the cluster. Killing the Job. mapResourceRequest: " +
-                mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability;
+        if (mapResourceRequest.equals(Resources.none())) {
+          mapResourceRequest = reqEvent.getCapability();
+          eventHandler.handle(new JobHistoryEvent(jobId,
+            new NormalizedResourceEvent(
+              org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest
+                .getMemory())));
+          LOG.info("mapResourceRequest:" + mapResourceRequest);
+          if (mapResourceRequest.getMemory() > supportedMaxContainerCapability
+            .getMemory()
+              || mapResourceRequest.getVirtualCores() > supportedMaxContainerCapability
+                .getVirtualCores()) {
+            String diagMsg =
+                "MAP capability required is more than the supported "
+                    + "max container capability in the cluster. Killing the Job. mapResourceRequest: "
+                    + mapResourceRequest + " maxContainerCapability:"
+                    + supportedMaxContainerCapability;
             LOG.info(diagMsg);
-            eventHandler.handle(new JobDiagnosticsUpdateEvent(
-                jobId, diagMsg));
+            eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
             eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
           }
         }
-        //set the rounded off memory
-        reqEvent.getCapability().setMemory(mapResourceRequest);
+        // set the resources
+        reqEvent.getCapability().setMemory(mapResourceRequest.getMemory());
+        reqEvent.getCapability().setVirtualCores(
+          mapResourceRequest.getVirtualCores());
         scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
       } else {
-        if (reduceResourceRequest == 0) {
-          reduceResourceRequest = reqEvent.getCapability().getMemory();
-          eventHandler.handle(new JobHistoryEvent(jobId, 
-              new NormalizedResourceEvent(
-                  org.apache.hadoop.mapreduce.TaskType.REDUCE,
-                  reduceResourceRequest)));
-          LOG.info("reduceResourceRequest:"+ reduceResourceRequest);
-          if (reduceResourceRequest > supportedMaxContainerCapability) {
-            String diagMsg = "REDUCE capability required is more than the " +
-            		"supported max container capability in the cluster. Killing the " +
-            		"Job. reduceResourceRequest: " + reduceResourceRequest +
-            		" maxContainerCapability:" + supportedMaxContainerCapability;
+        if (reduceResourceRequest.equals(Resources.none())) {
+          reduceResourceRequest = reqEvent.getCapability();
+          eventHandler.handle(new JobHistoryEvent(jobId,
+            new NormalizedResourceEvent(
+              org.apache.hadoop.mapreduce.TaskType.REDUCE,
+              reduceResourceRequest.getMemory())));
+          LOG.info("reduceResourceRequest:" + reduceResourceRequest);
+          if (reduceResourceRequest.getMemory() > supportedMaxContainerCapability
+            .getMemory()
+              || reduceResourceRequest.getVirtualCores() > supportedMaxContainerCapability
+                .getVirtualCores()) {
+            String diagMsg =
+                "REDUCE capability required is more than the "
+                    + "supported max container capability in the cluster. Killing the "
+                    + "Job. reduceResourceRequest: " + reduceResourceRequest
+                    + " maxContainerCapability:"
+                    + supportedMaxContainerCapability;
             LOG.info(diagMsg);
-            eventHandler.handle(new JobDiagnosticsUpdateEvent(
-                jobId, diagMsg));
+            eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
             eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
           }
         }
-        //set the rounded off memory
-        reqEvent.getCapability().setMemory(reduceResourceRequest);
+        // set the resources
+        reqEvent.getCapability().setMemory(reduceResourceRequest.getMemory());
+        reqEvent.getCapability().setVirtualCores(
+          reduceResourceRequest.getVirtualCores());
         if (reqEvent.getEarlierAttemptFailed()) {
           //add to the front of queue for fail fast
           pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
@@ -425,34 +439,40 @@ public class RMContainerAllocator extends RMContainerRequestor
 
   @Private
   @VisibleForTesting
-  synchronized void setReduceResourceRequest(int mem) {
-    this.reduceResourceRequest = mem;
+  synchronized void setReduceResourceRequest(Resource res) {
+    this.reduceResourceRequest = res;
   }
 
   @Private
   @VisibleForTesting
-  synchronized void setMapResourceRequest(int mem) {
-    this.mapResourceRequest = mem;
+  synchronized void setMapResourceRequest(Resource res) {
+    this.mapResourceRequest = res;
   }
 
   @Private
   @VisibleForTesting
   void preemptReducesIfNeeded() {
-    if (reduceResourceRequest == 0) {
-      return; //no reduces
+    if (reduceResourceRequest.equals(Resources.none())) {
+      return; // no reduces
     }
     //check if reduces have taken over the whole cluster and there are 
     //unassigned maps
     if (scheduledRequests.maps.size() > 0) {
-      int memLimit = getMemLimit();
-      int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
-          assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest);
-      //availableMemForMap must be sufficient to run atleast 1 map
-      if (availableMemForMap < mapResourceRequest) {
-        //to make sure new containers are given to maps and not reduces
-        //ramp down all scheduled reduces if any
-        //(since reduces are scheduled at higher priority than maps)
-        LOG.info("Ramping down all scheduled reduces:" + scheduledRequests.reduces.size());
+      Resource resourceLimit = getResourceLimit();
+      Resource availableResourceForMap =
+          Resources.subtract(
+            resourceLimit,
+            Resources.multiply(reduceResourceRequest,
+              assignedRequests.reduces.size()
+                  - assignedRequests.preemptionWaitingReduces.size()));
+      // availableMemForMap must be sufficient to run at least 1 map
+      if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
+        mapResourceRequest, getSchedulerResourceTypes()) <= 0) {
+        // to make sure new containers are given to maps and not reduces
+        // ramp down all scheduled reduces if any
+        // (since reduces are scheduled at higher priority than maps)
+        LOG.info("Ramping down all scheduled reduces:"
+            + scheduledRequests.reduces.size());
         for (ContainerRequest req : scheduledRequests.reduces.values()) {
           pendingReduces.add(req);
         }
@@ -462,17 +482,25 @@ public class RMContainerAllocator extends RMContainerRequestor
         //hanging around for a while
         int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps);
         if (hangingMapRequests > 0) {
-          //preempt for making space for at least one map
-          int premeptionLimit = Math.max(mapResourceRequest,
-              (int) (maxReducePreemptionLimit * memLimit));
-
-          int preemptMem = Math.min(hangingMapRequests * mapResourceRequest,
-              premeptionLimit);
-
-          int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest);
-          toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
-
-          LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
+          // preempt for making space for at least one map
+          int preemptionReduceNumForOneMap =
+              ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest,
+                reduceResourceRequest, getSchedulerResourceTypes());
+          int preemptionReduceNumForPreemptionLimit =
+              ResourceCalculatorUtils.divideAndCeilContainers(
+                Resources.multiply(resourceLimit, maxReducePreemptionLimit),
+                reduceResourceRequest, getSchedulerResourceTypes());
+          int preemptionReduceNumForAllMaps =
+              ResourceCalculatorUtils.divideAndCeilContainers(
+                Resources.multiply(mapResourceRequest, hangingMapRequests),
+                reduceResourceRequest, getSchedulerResourceTypes());
+          int toPreempt =
+              Math.min(Math.max(preemptionReduceNumForOneMap,
+                preemptionReduceNumForPreemptionLimit),
+                preemptionReduceNumForAllMaps);
+
+          LOG.info("Going to preempt " + toPreempt
+              + " due to lack of space for maps");
           assignedRequests.preemptReduce(toPreempt);
         }
       }
@@ -497,7 +525,7 @@ public class RMContainerAllocator extends RMContainerRequestor
       int totalMaps, int completedMaps,
       int scheduledMaps, int scheduledReduces,
       int assignedMaps, int assignedReduces,
-      int mapResourceReqt, int reduceResourceReqt,
+      Resource mapResourceReqt, Resource reduceResourceReqt,
       int numPendingReduces,
       float maxReduceRampupLimit, float reduceSlowStart) {
     
@@ -505,8 +533,12 @@ public class RMContainerAllocator extends RMContainerRequestor
       return;
     }
     
-    int headRoom = getAvailableResources() != null ?
-        getAvailableResources().getMemory() : 0;
+    // get available resources for this job
+    Resource headRoom = getAvailableResources();
+    if (headRoom == null) {
+      headRoom = Resources.none();
+    }
+
     LOG.info("Recalculating schedule, headroom=" + headRoom);
     
     //check for slow start
@@ -540,49 +572,60 @@ public class RMContainerAllocator extends RMContainerRequestor
       completedMapPercent = 1;
     }
     
-    int netScheduledMapMem = 
-        (scheduledMaps + assignedMaps) * mapResourceReqt;
+    Resource netScheduledMapResource =
+        Resources.multiply(mapResourceReqt, (scheduledMaps + assignedMaps));
 
-    int netScheduledReduceMem = 
-        (scheduledReduces + assignedReduces) * reduceResourceReqt;
+    Resource netScheduledReduceResource =
+        Resources.multiply(reduceResourceReqt,
+          (scheduledReduces + assignedReduces));
+
+    Resource finalMapResourceLimit;
+    Resource finalReduceResourceLimit;
 
-    int finalMapMemLimit = 0;
-    int finalReduceMemLimit = 0;
-    
     // ramp up the reduces based on completed map percentage
-    int totalMemLimit = getMemLimit();
-    int idealReduceMemLimit = 
-        Math.min(
-            (int)(completedMapPercent * totalMemLimit),
-            (int) (maxReduceRampupLimit * totalMemLimit));
-    int idealMapMemLimit = totalMemLimit - idealReduceMemLimit;
+    Resource totalResourceLimit = getResourceLimit();
+
+    Resource idealReduceResourceLimit =
+        Resources.multiply(totalResourceLimit,
+          Math.min(completedMapPercent, maxReduceRampupLimit));
+    Resource ideaMapResourceLimit =
+        Resources.subtract(totalResourceLimit, idealReduceResourceLimit);
 
     // check if there aren't enough maps scheduled, give the free map capacity
-    // to reduce
-    if (idealMapMemLimit > netScheduledMapMem) {
-      int unusedMapMemLimit = idealMapMemLimit - netScheduledMapMem;
-      finalReduceMemLimit = idealReduceMemLimit + unusedMapMemLimit;
-      finalMapMemLimit = totalMemLimit - finalReduceMemLimit;
+    // to reduce.
+    // Even when container number equals, there may be unused resources in one
+    // dimension
+    if (ResourceCalculatorUtils.computeAvailableContainers(ideaMapResourceLimit,
+      mapResourceReqt, getSchedulerResourceTypes()) >= (scheduledMaps + assignedMaps)) {
+      // enough resource given to maps, given the remaining to reduces
+      Resource unusedMapResourceLimit =
+          Resources.subtract(ideaMapResourceLimit, netScheduledMapResource);
+      finalReduceResourceLimit =
+          Resources.add(idealReduceResourceLimit, unusedMapResourceLimit);
+      finalMapResourceLimit =
+          Resources.subtract(totalResourceLimit, finalReduceResourceLimit);
     } else {
-      finalMapMemLimit = idealMapMemLimit;
-      finalReduceMemLimit = idealReduceMemLimit;
+      finalMapResourceLimit = ideaMapResourceLimit;
+      finalReduceResourceLimit = idealReduceResourceLimit;
     }
-    
-    LOG.info("completedMapPercent " + completedMapPercent +
-        " totalMemLimit:" + totalMemLimit +
-        " finalMapMemLimit:" + finalMapMemLimit +
-        " finalReduceMemLimit:" + finalReduceMemLimit + 
-        " netScheduledMapMem:" + netScheduledMapMem +
-        " netScheduledReduceMem:" + netScheduledReduceMem);
-    
-    int rampUp = 
-        (finalReduceMemLimit - netScheduledReduceMem) / reduceResourceReqt;
-    
+
+    LOG.info("completedMapPercent " + completedMapPercent
+        + " totalResourceLimit:" + totalResourceLimit
+        + " finalMapResourceLimit:" + finalMapResourceLimit
+        + " finalReduceResourceLimit:" + finalReduceResourceLimit
+        + " netScheduledMapResource:" + netScheduledMapResource
+        + " netScheduledReduceResource:" + netScheduledReduceResource);
+
+    int rampUp =
+        ResourceCalculatorUtils.computeAvailableContainers(Resources.subtract(
+                finalReduceResourceLimit, netScheduledReduceResource),
+            reduceResourceReqt, getSchedulerResourceTypes());
+
     if (rampUp > 0) {
       rampUp = Math.min(rampUp, numPendingReduces);
       LOG.info("Ramping up " + rampUp);
       rampUpReduces(rampUp);
-    } else if (rampUp < 0){
+    } else if (rampUp < 0) {
       int rampDown = -1 * rampUp;
       rampDown = Math.min(rampDown, scheduledReduces);
       LOG.info("Ramping down " + rampDown);
@@ -618,8 +661,10 @@ public class RMContainerAllocator extends RMContainerRequestor
   
   @SuppressWarnings("unchecked")
   private List<Container> getResources() throws Exception {
-    int headRoom = getAvailableResources() != null
-        ? getAvailableResources().getMemory() : 0;//first time it would be null
+    // will be null the first time
+    Resource headRoom =
+        getAvailableResources() == null ? Resources.none() :
+            Resources.clone(getAvailableResources());
     AllocateResponse response;
     /*
      * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
@@ -670,7 +715,9 @@ public class RMContainerAllocator extends RMContainerRequestor
         throw new YarnRuntimeException(msg);
       }
     }
-    int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
+    Resource newHeadRoom =
+        getAvailableResources() == null ? Resources.none()
+            : getAvailableResources();
     List<Container> newContainers = response.getAllocatedContainers();
     // Setting NMTokens
     if (response.getNMTokens() != null) {
@@ -694,10 +741,11 @@ public class RMContainerAllocator extends RMContainerRequestor
           new PreemptionContext(assignedRequests), preemptReq);
     }
 
-    if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) {
+    if (newContainers.size() + finishedContainers.size() > 0
+        || !headRoom.equals(newHeadRoom)) {
       //something changed
       recalculateReduceSchedule = true;
-      if (LOG.isDebugEnabled() && headRoom != newHeadRoom) {
+      if (LOG.isDebugEnabled() && !headRoom.equals(newHeadRoom)) {
         LOG.debug("headroom=" + newHeadRoom);
       }
     }
@@ -802,10 +850,18 @@ public class RMContainerAllocator extends RMContainerRequestor
   }
 
   @Private
-  public int getMemLimit() {
-    int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
-    return headRoom + assignedRequests.maps.size() * mapResourceRequest +
-       assignedRequests.reduces.size() * reduceResourceRequest;
+  public Resource getResourceLimit() {
+    Resource headRoom = getAvailableResources();
+    if (headRoom == null) {
+      headRoom = Resources.none();
+    }
+    Resource assignedMapResource =
+        Resources.multiply(mapResourceRequest, assignedRequests.maps.size());
+    Resource assignedReduceResource =
+        Resources.multiply(reduceResourceRequest,
+          assignedRequests.reduces.size());
+    return Resources.add(headRoom,
+      Resources.add(assignedMapResource, assignedReduceResource));
   }
 
   @Private
@@ -914,10 +970,11 @@ public class RMContainerAllocator extends RMContainerRequestor
         // a container to be assigned
         boolean isAssignable = true;
         Priority priority = allocated.getPriority();
-        int allocatedMemory = allocated.getResource().getMemory();
+        Resource allocatedResource = allocated.getResource();
         if (PRIORITY_FAST_FAIL_MAP.equals(priority) 
             || PRIORITY_MAP.equals(priority)) {
-          if (allocatedMemory < mapResourceRequest
+          if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource,
+              mapResourceRequest, getSchedulerResourceTypes()) <= 0
               || maps.isEmpty()) {
             LOG.info("Cannot assign container " + allocated 
                 + " for a map as either "
@@ -928,7 +985,8 @@ public class RMContainerAllocator extends RMContainerRequestor
           }
         } 
         else if (PRIORITY_REDUCE.equals(priority)) {
-          if (allocatedMemory < reduceResourceRequest
+          if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource,
+              reduceResourceRequest, getSchedulerResourceTypes()) <= 0
               || reduces.isEmpty()) {
             LOG.info("Cannot assign container " + allocated 
                 + " for a reduce as either "

+ 52 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ResourceCalculatorUtils.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.EnumSet;
+
+public class ResourceCalculatorUtils {
+  public static int divideAndCeil(int a, int b) {
+    if (b == 0) {
+      return 0;
+    }
+    return (a + (b - 1)) / b;
+  }
+
+  public static int computeAvailableContainers(Resource available,
+      Resource required, EnumSet<SchedulerResourceTypes> resourceTypes) {
+    if (resourceTypes.contains(SchedulerResourceTypes.CPU)) {
+      return Math.min(available.getMemory() / required.getMemory(),
+        available.getVirtualCores() / required.getVirtualCores());
+    }
+    return available.getMemory() / required.getMemory();
+  }
+
+  public static int divideAndCeilContainers(Resource required, Resource factor,
+      EnumSet<SchedulerResourceTypes> resourceTypes) {
+    if (resourceTypes.contains(SchedulerResourceTypes.CPU)) {
+      return Math.max(divideAndCeil(required.getMemory(), factor.getMemory()),
+        divideAndCeil(required.getVirtualCores(), factor.getVirtualCores()));
+    }
+    return divideAndCeil(required.getMemory(), factor.getMemory());
+  }
+}

+ 54 - 28
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyFloat;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.isA;
@@ -30,19 +31,14 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -458,8 +454,8 @@ public class TestRMContainerAllocator {
             0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob, new SystemClock());
-    allocator.setMapResourceRequest(1024);
-    allocator.setReduceResourceRequest(1024);
+    allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1));
+    allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1));
     RMContainerAllocator.AssignedRequests assignedRequests =
         allocator.getAssignedRequests();
     RMContainerAllocator.ScheduledRequests scheduledRequests =
@@ -478,7 +474,7 @@ public class TestRMContainerAllocator {
 
   @Test(timeout = 30000)
   public void testNonAggressivelyPreemptReducers() throws Exception {
-    LOG.info("Running testPreemptReducers");
+    LOG.info("Running testNonAggressivelyPreemptReducers");
 
     final int preemptThreshold = 2; //sec
     Configuration conf = new Configuration();
@@ -513,8 +509,8 @@ public class TestRMContainerAllocator {
     clock.setTime(1);
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob, clock);
-    allocator.setMapResourceRequest(1024);
-    allocator.setReduceResourceRequest(1024);
+    allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1));
+    allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1));
     RMContainerAllocator.AssignedRequests assignedRequests =
         allocator.getAssignedRequests();
     RMContainerAllocator.ScheduledRequests scheduledRequests =
@@ -1774,17 +1770,19 @@ public class TestRMContainerAllocator {
     int scheduledReduces = 0;
     int assignedMaps = 2;
     int assignedReduces = 0;
-    int mapResourceReqt = 1024;
-    int reduceResourceReqt = 2*1024;
+    Resource mapResourceReqt = BuilderUtils.newResource(1024, 1);
+    Resource reduceResourceReqt = BuilderUtils.newResource(2 * 1024, 1);
     int numPendingReduces = 4;
     float maxReduceRampupLimit = 0.5f;
     float reduceSlowStart = 0.2f;
     
     RMContainerAllocator allocator = mock(RMContainerAllocator.class);
-    doCallRealMethod().when(allocator).
-        scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), 
-            anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat());
-    
+    doCallRealMethod().when(allocator).scheduleReduces(anyInt(), anyInt(),
+        anyInt(), anyInt(), anyInt(), anyInt(), any(Resource.class),
+        any(Resource.class), anyInt(), anyFloat(), anyFloat());
+    doReturn(EnumSet.of(SchedulerResourceTypes.MEMORY)).when(allocator)
+      .getSchedulerResourceTypes();
+
     // Test slow-start
     allocator.scheduleReduces(
         totalMaps, succeededMaps, 
@@ -1808,6 +1806,7 @@ public class TestRMContainerAllocator {
     verify(allocator, never()).scheduleAllReduces();
 
     succeededMaps = 3;
+    doReturn(BuilderUtils.newResource(0, 0)).when(allocator).getResourceLimit();
     allocator.scheduleReduces(
         totalMaps, succeededMaps, 
         scheduledMaps, scheduledReduces, 
@@ -1818,7 +1817,8 @@ public class TestRMContainerAllocator {
     verify(allocator, times(1)).setIsReduceStarted(true);
     
     // Test reduce ramp-up
-    doReturn(100 * 1024).when(allocator).getMemLimit();
+    doReturn(BuilderUtils.newResource(100 * 1024, 100 * 1)).when(allocator)
+      .getResourceLimit();
     allocator.scheduleReduces(
         totalMaps, succeededMaps, 
         scheduledMaps, scheduledReduces, 
@@ -1831,13 +1831,14 @@ public class TestRMContainerAllocator {
 
     // Test reduce ramp-down
     scheduledReduces = 3;
-    doReturn(10 * 1024).when(allocator).getMemLimit();
+    doReturn(BuilderUtils.newResource(10 * 1024, 10 * 1)).when(allocator)
+      .getResourceLimit();
     allocator.scheduleReduces(
-        totalMaps, succeededMaps, 
-        scheduledMaps, scheduledReduces, 
-        assignedMaps, assignedReduces, 
-        mapResourceReqt, reduceResourceReqt, 
-        numPendingReduces, 
+        totalMaps, succeededMaps,
+        scheduledMaps, scheduledReduces,
+        assignedMaps, assignedReduces,
+        mapResourceReqt, reduceResourceReqt,
+        numPendingReduces,
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator).rampDownReduces(anyInt());
 
@@ -1846,7 +1847,8 @@ public class TestRMContainerAllocator {
     // should be invoked twice.
     scheduledMaps = 2;
     assignedReduces = 2;
-    doReturn(10 * 1024).when(allocator).getMemLimit();
+    doReturn(BuilderUtils.newResource(10 * 1024, 10 * 1)).when(allocator)
+      .getResourceLimit();
     allocator.scheduleReduces(
         totalMaps, succeededMaps, 
         scheduledMaps, scheduledReduces, 
@@ -1855,6 +1857,30 @@ public class TestRMContainerAllocator {
         numPendingReduces, 
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator, times(2)).rampDownReduces(anyInt());
+
+    doReturn(
+        EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU))
+        .when(allocator).getSchedulerResourceTypes();
+
+    // Test ramp-down when enough memory but not enough cpu resource
+    scheduledMaps = 10;
+    assignedReduces = 0;
+    doReturn(BuilderUtils.newResource(100 * 1024, 5 * 1)).when(allocator)
+        .getResourceLimit();
+    allocator.scheduleReduces(totalMaps, succeededMaps, scheduledMaps,
+        scheduledReduces, assignedMaps, assignedReduces, mapResourceReqt,
+        reduceResourceReqt, numPendingReduces, maxReduceRampupLimit,
+        reduceSlowStart);
+    verify(allocator, times(3)).rampDownReduces(anyInt());
+
+    // Test ramp-down when enough cpu but not enough memory resource
+    doReturn(BuilderUtils.newResource(10 * 1024, 100 * 1)).when(allocator)
+        .getResourceLimit();
+    allocator.scheduleReduces(totalMaps, succeededMaps, scheduledMaps,
+        scheduledReduces, assignedMaps, assignedReduces, mapResourceReqt,
+        reduceResourceReqt, numPendingReduces, maxReduceRampupLimit,
+        reduceSlowStart);
+    verify(allocator, times(4)).rampDownReduces(anyInt());
   }
 
   private static class RecalculateContainerAllocator extends MyContainerAllocator {
@@ -1868,7 +1894,7 @@ public class TestRMContainerAllocator {
     @Override
     public void scheduleReduces(int totalMaps, int completedMaps,
         int scheduledMaps, int scheduledReduces, int assignedMaps,
-        int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
+        int assignedReduces, Resource mapResourceReqt, Resource reduceResourceReqt,
         int numPendingReduces, float maxReduceRampupLimit, float reduceSlowStart) {
       recalculatedReduceSchedule = true;
     }

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java

@@ -18,12 +18,12 @@
 
 package org.apache.hadoop.yarn.util.resource;
 
-import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.Records;
 
-@Private
+@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
 @Unstable
 public class Resources {