Browse Source

YARN-3542. Refactored existing CPU cgroups support to use the newer and integrated ResourceHandler mechanism, and also deprecated the old LCEResourceHandler inteface hierarchy. Contributed by Varun Vasudev.

Vinod Kumar Vavilapalli (I am also known as @tshooter.) 9 years ago
parent
commit
2085e60a96
13 changed files with 661 additions and 58 deletions
  1. 4 0
      hadoop-yarn-project/CHANGES.txt
  2. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
  4. 24 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
  5. 235 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java
  6. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
  7. 32 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CpuResourceHandler.java
  8. 34 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
  9. 14 54
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
  10. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/DefaultLCEResourcesHandler.java
  11. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/LCEResourcesHandler.java
  12. 297 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsCpuResourceHandlerImpl.java
  13. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -107,6 +107,10 @@ Release 2.9.0 - UNRELEASED
     YARN-4496. Improve HA ResourceManager Failover detection on the client.
     (Jian He via xgong)
 
+    YARN-3542. Refactored existing CPU cgroups support to use the newer and
+    integrated ResourceHandler mechanism, and also deprecated the old
+    LCEResourceHandler inteface hierarchy. (Varun Vasudev via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -938,6 +938,18 @@ public class YarnConfiguration extends Configuration {
       DEFAULT_NM_MEMORY_RESOURCE_CGROUPS_SOFT_LIMIT_PERCENTAGE =
       90.0f;
 
+  @Private
+  public static final String NM_CPU_RESOURCE_PREFIX = NM_PREFIX
+      + "resource.cpu.";
+
+  /** Enable cpu isolation. */
+  @Private
+  public static final String NM_CPU_RESOURCE_ENABLED =
+      NM_CPU_RESOURCE_PREFIX + "enabled";
+
+  @Private
+  public static final boolean DEFAULT_NM_CPU_RESOURCE_ENABLED = false;
+
   /**
    * Prefix for disk configurations. Work in progress: This configuration
    * parameter may be changed/removed in the future.

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java

@@ -111,6 +111,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
         .add(YarnConfiguration.NM_DISK_RESOURCE_ENABLED);
     configurationPrefixToSkipCompare
         .add(YarnConfiguration.NM_MEMORY_RESOURCE_PREFIX);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
 
     // Set by container-executor.cfg
     configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);

+ 24 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java

@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
+import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;
 import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
 import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -95,10 +96,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
   public void setConf(Configuration conf) {
     super.setConf(conf);
 
-    resourcesHandler = ReflectionUtils.newInstance(
-        conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
-            DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
-    resourcesHandler.setConf(conf);
+    resourcesHandler = getResourcesHandler(conf);
 
     if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY)
         != null) {
@@ -122,6 +120,23 @@ public class LinuxContainerExecutor extends ContainerExecutor {
     }
   }
 
+  private LCEResourcesHandler getResourcesHandler(Configuration conf) {
+    LCEResourcesHandler handler = ReflectionUtils.newInstance(
+        conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
+            DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf);
+
+    // Stop using CgroupsLCEResourcesHandler
+    // use the resource handler chain instead
+    // ResourceHandlerModule will create the cgroup cpu module if
+    // CgroupsLCEResourcesHandler is set
+    if (handler instanceof CgroupsLCEResourcesHandler) {
+      handler =
+          ReflectionUtils.newInstance(DefaultLCEResourcesHandler.class, conf);
+    }
+    handler.setConf(conf);
+    return handler;
+  }
+
   void verifyUsernamePattern(String user) {
     if (!UserGroupInformation.isSecurityEnabled() &&
         !nonsecureLocalUserPattern.matcher(user).matches()) {
@@ -184,7 +199,12 @@ public class LinuxContainerExecutor extends ContainerExecutor {
     try {
       resourceHandlerChain = ResourceHandlerModule
           .getConfiguredResourceHandlerChain(conf);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Resource handler chain enabled = " + (resourceHandlerChain
+            == null));
+      }
       if (resourceHandlerChain != null) {
+        LOG.debug("Bootstrapping resource handler chain");
         resourceHandlerChain.bootstrap(conf);
       }
     } catch (ResourceHandlerException e) {

+ 235 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java

@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An implementation for using CGroups to restrict CPU usage on Linux. The
+ * implementation supports 3 different controls - restrict usage of all YARN
+ * containers, restrict relative usage of individual YARN containers and
+ * restrict usage of individual YARN containers. Admins can set the overall CPU
+ * to be used by all YARN containers - this is implemented by setting
+ * cpu.cfs_period_us and cpu.cfs_quota_us to the ratio desired. If strict
+ * resource usage mode is not enabled, cpu.shares is set for individual
+ * containers - this prevents containers from exceeding the overall limit for
+ * YARN containers but individual containers can use as much of the CPU as
+ * available(under the YARN limit). If strict resource usage is enabled, then
+ * container can only use the percentage of CPU allocated to them and this is
+ * again implemented using cpu.cfs_period_us and cpu.cfs_quota_us.
+ *
+ */
+@InterfaceStability.Unstable
+@InterfaceAudience.Private
+public class CGroupsCpuResourceHandlerImpl implements CpuResourceHandler {
+
+  static final Log LOG = LogFactory.getLog(CGroupsCpuResourceHandlerImpl.class);
+
+  private CGroupsHandler cGroupsHandler;
+  private boolean strictResourceUsageMode = false;
+  private float yarnProcessors;
+  private int nodeVCores;
+  private static final CGroupsHandler.CGroupController CPU =
+      CGroupsHandler.CGroupController.CPU;
+
+  @VisibleForTesting
+  static final int MAX_QUOTA_US = 1000 * 1000;
+  @VisibleForTesting
+  static final int MIN_PERIOD_US = 1000;
+  @VisibleForTesting
+  static final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel
+
+  CGroupsCpuResourceHandlerImpl(CGroupsHandler cGroupsHandler) {
+    this.cGroupsHandler = cGroupsHandler;
+  }
+
+  @Override
+  public List<PrivilegedOperation> bootstrap(Configuration conf)
+      throws ResourceHandlerException {
+    return bootstrap(
+        ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf), conf);
+  }
+
+  @VisibleForTesting
+  List<PrivilegedOperation> bootstrap(
+      ResourceCalculatorPlugin plugin, Configuration conf)
+      throws ResourceHandlerException {
+    this.strictResourceUsageMode = conf.getBoolean(
+        YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
+        YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE);
+    this.cGroupsHandler.mountCGroupController(CPU);
+    nodeVCores = NodeManagerHardwareUtils.getVCores(plugin, conf);
+
+    // cap overall usage to the number of cores allocated to YARN
+    yarnProcessors = NodeManagerHardwareUtils.getContainersCPUs(plugin, conf);
+    int systemProcessors = NodeManagerHardwareUtils.getNodeCPUs(plugin, conf);
+    boolean existingCpuLimits;
+    try {
+      existingCpuLimits =
+          cpuLimitsExist(cGroupsHandler.getPathForCGroup(CPU, ""));
+    } catch (IOException ie) {
+      throw new ResourceHandlerException(ie);
+    }
+    if (systemProcessors != (int) yarnProcessors) {
+      LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
+      int[] limits = getOverallLimits(yarnProcessors);
+      cGroupsHandler
+          .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_PERIOD_US,
+              String.valueOf(limits[0]));
+      cGroupsHandler
+          .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_QUOTA_US,
+              String.valueOf(limits[1]));
+    } else if (existingCpuLimits) {
+      LOG.info("Removing CPU constraints for YARN containers.");
+      cGroupsHandler
+          .updateCGroupParam(CPU, "", CGroupsHandler.CGROUP_CPU_QUOTA_US,
+              String.valueOf(-1));
+    }
+    return null;
+  }
+
+  @InterfaceAudience.Private
+  public static boolean cpuLimitsExist(String path)
+      throws IOException {
+    File quotaFile = new File(path,
+        CPU.getName() + "." + CGroupsHandler.CGROUP_CPU_QUOTA_US);
+    if (quotaFile.exists()) {
+      String contents = FileUtils.readFileToString(quotaFile, "UTF-8");
+      int quotaUS = Integer.parseInt(contents.trim());
+      if (quotaUS != -1) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  public static int[] getOverallLimits(float yarnProcessors) {
+
+    int[] ret = new int[2];
+
+    if (yarnProcessors < 0.01f) {
+      throw new IllegalArgumentException("Number of processors can't be <= 0.");
+    }
+
+    int quotaUS = MAX_QUOTA_US;
+    int periodUS = (int) (MAX_QUOTA_US / yarnProcessors);
+    if (yarnProcessors < 1.0f) {
+      periodUS = MAX_QUOTA_US;
+      quotaUS = (int) (periodUS * yarnProcessors);
+      if (quotaUS < MIN_PERIOD_US) {
+        LOG.warn("The quota calculated for the cgroup was too low."
+            + " The minimum value is " + MIN_PERIOD_US
+            + ", calculated value is " + quotaUS
+            + ". Setting quota to minimum value.");
+        quotaUS = MIN_PERIOD_US;
+      }
+    }
+
+    // cfs_period_us can't be less than 1000 microseconds
+    // if the value of periodUS is less than 1000, we can't really use cgroups
+    // to limit cpu
+    if (periodUS < MIN_PERIOD_US) {
+      LOG.warn("The period calculated for the cgroup was too low."
+          + " The minimum value is " + MIN_PERIOD_US
+          + ", calculated value is " + periodUS
+          + ". Using all available CPU.");
+      periodUS = MAX_QUOTA_US;
+      quotaUS = -1;
+    }
+
+    ret[0] = periodUS;
+    ret[1] = quotaUS;
+    return ret;
+  }
+
+  @Override
+  public List<PrivilegedOperation> preStart(Container container)
+      throws ResourceHandlerException {
+
+    String cgroupId = container.getContainerId().toString();
+    Resource containerResource = container.getResource();
+    cGroupsHandler.createCGroup(CPU, cgroupId);
+    try {
+      int containerVCores = containerResource.getVirtualCores();
+      int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores;
+      cGroupsHandler
+          .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_SHARES,
+              String.valueOf(cpuShares));
+      if (strictResourceUsageMode) {
+        if (nodeVCores != containerVCores) {
+          float containerCPU =
+              (containerVCores * yarnProcessors) / (float) nodeVCores;
+          int[] limits = getOverallLimits(containerCPU);
+          cGroupsHandler.updateCGroupParam(CPU, cgroupId,
+              CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(limits[0]));
+          cGroupsHandler.updateCGroupParam(CPU, cgroupId,
+              CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(limits[1]));
+        }
+      }
+    } catch (ResourceHandlerException re) {
+      cGroupsHandler.deleteCGroup(CPU, cgroupId);
+      LOG.warn("Could not update cgroup for container", re);
+      throw re;
+    }
+    List<PrivilegedOperation> ret = new ArrayList<>();
+    ret.add(new PrivilegedOperation(
+        PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+        PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupsHandler
+            .getPathForCGroupTasks(CPU, cgroupId)));
+    return ret;
+  }
+
+  @Override
+  public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
+      throws ResourceHandlerException {
+    return null;
+  }
+
+  @Override
+  public List<PrivilegedOperation> postComplete(ContainerId containerId)
+      throws ResourceHandlerException {
+    cGroupsHandler.deleteCGroup(CPU, containerId.toString());
+    return null;
+  }
+
+  @Override public List<PrivilegedOperation> teardown()
+      throws ResourceHandlerException {
+    return null;
+  }
+}

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java

@@ -58,6 +58,10 @@ public interface CGroupsHandler {
   String CGROUP_PARAM_MEMORY_SWAPPINESS = "swappiness";
 
 
+  String CGROUP_CPU_PERIOD_US = "cfs_period_us";
+  String CGROUP_CPU_QUOTA_US = "cfs_quota_us";
+  String CGROUP_CPU_SHARES = "shares";
+
   /**
    * Mounts a cgroup controller
    * @param controller - the controller being mounted

+ 32 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CpuResourceHandler.java

@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Resource handler for cpu resources.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface CpuResourceHandler extends ResourceHandler {
+
+}

+ 34 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java

@@ -21,11 +21,15 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;
+import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -38,6 +42,7 @@ import java.util.List;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class ResourceHandlerModule {
+  static final Log LOG = LogFactory.getLog(ResourceHandlerModule.class);
   private static volatile ResourceHandlerChain resourceHandlerChain;
 
   /**
@@ -52,6 +57,8 @@ public class ResourceHandlerModule {
       cGroupsBlkioResourceHandler;
   private static volatile CGroupsMemoryResourceHandlerImpl
       cGroupsMemoryResourceHandler;
+  private static volatile CGroupsCpuResourceHandlerImpl
+      cGroupsCpuResourceHandler;
 
   /**
    * Returns an initialized, thread-safe CGroupsHandler instance.
@@ -70,6 +77,30 @@ public class ResourceHandlerModule {
     return cGroupsHandler;
   }
 
+  private static CGroupsCpuResourceHandlerImpl getcGroupsCpuResourceHandler(
+      Configuration conf) throws ResourceHandlerException {
+    boolean cgroupsCpuEnabled =
+        conf.getBoolean(YarnConfiguration.NM_CPU_RESOURCE_ENABLED,
+            YarnConfiguration.DEFAULT_NM_CPU_RESOURCE_ENABLED);
+    boolean cgroupsLCEResourcesHandlerEnabled =
+        conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
+            DefaultLCEResourcesHandler.class)
+            .equals(CgroupsLCEResourcesHandler.class);
+    if (cgroupsCpuEnabled || cgroupsLCEResourcesHandlerEnabled) {
+      if (cGroupsCpuResourceHandler == null) {
+        synchronized (CpuResourceHandler.class) {
+          if (cGroupsCpuResourceHandler == null) {
+            LOG.debug("Creating new cgroups cpu handler");
+            cGroupsCpuResourceHandler =
+                new CGroupsCpuResourceHandlerImpl(getCGroupsHandler(conf));
+            return cGroupsCpuResourceHandler;
+          }
+        }
+      }
+    }
+    return null;
+  }
+
   private static TrafficControlBandwidthHandlerImpl
     getTrafficControlBandwidthHandler(Configuration conf)
       throws ResourceHandlerException {
@@ -78,6 +109,7 @@ public class ResourceHandlerModule {
       if (trafficControlBandwidthHandler == null) {
         synchronized (OutboundBandwidthResourceHandler.class) {
           if (trafficControlBandwidthHandler == null) {
+            LOG.debug("Creating new traffic control bandwidth handler");
             trafficControlBandwidthHandler = new
                 TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor
                 .getInstance(conf), getCGroupsHandler(conf),
@@ -113,6 +145,7 @@ public class ResourceHandlerModule {
     if (cGroupsBlkioResourceHandler == null) {
       synchronized (DiskResourceHandler.class) {
         if (cGroupsBlkioResourceHandler == null) {
+          LOG.debug("Creating new cgroups blkio handler");
           cGroupsBlkioResourceHandler =
               new CGroupsBlkioResourceHandlerImpl(getCGroupsHandler(conf));
         }
@@ -158,6 +191,7 @@ public class ResourceHandlerModule {
     addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf));
     addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf));
     addHandlerIfNotNull(handlerList, getMemoryResourceHandler(conf));
+    addHandlerIfNotNull(handlerList, getcGroupsCpuResourceHandler(conf));
     resourceHandlerChain = new ResourceHandlerChain(handlerList);
   }
 

+ 14 - 54
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java

@@ -39,7 +39,6 @@ import java.util.regex.Pattern;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -50,10 +49,20 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsCpuResourceHandlerImpl;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.SystemClock;
 
+/**
+ * Resource handler that lets you setup cgroups
+ * to to handle cpu isolation. Please look at the ResourceHandlerModule
+ * and CGroupsCpuResourceHandlerImpl classes which let you isolate multiple
+ * resources using cgroups.
+ * Deprecated - please look at ResourceHandlerModule and
+ * CGroupsCpuResourceHandlerImpl
+ */
+@Deprecated
 public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
 
   final static Log LOG = LogFactory
@@ -73,8 +82,6 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
   private final String CPU_PERIOD_US = "cfs_period_us";
   private final String CPU_QUOTA_US = "cfs_quota_us";
   private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel
-  private final int MAX_QUOTA_US = 1000 * 1000;
-  private final int MIN_PERIOD_US = 1000;
   private final Map<String, String> controllerPaths; // Controller -> path
 
   private long deleteCgroupTimeout;
@@ -163,65 +170,18 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
       int[] limits = getOverallLimits(yarnProcessors);
       updateCgroup(CONTROLLER_CPU, "", CPU_PERIOD_US, String.valueOf(limits[0]));
       updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(limits[1]));
-    } else if (cpuLimitsExist()) {
+    } else if (CGroupsCpuResourceHandlerImpl.cpuLimitsExist(
+        pathForCgroup(CONTROLLER_CPU, ""))) {
       LOG.info("Removing CPU constraints for YARN containers.");
       updateCgroup(CONTROLLER_CPU, "", CPU_QUOTA_US, String.valueOf(-1));
     }
   }
 
-  boolean cpuLimitsExist() throws IOException {
-    String path = pathForCgroup(CONTROLLER_CPU, "");
-    File quotaFile = new File(path, CONTROLLER_CPU + "." + CPU_QUOTA_US);
-    if (quotaFile.exists()) {
-      String contents = FileUtils.readFileToString(quotaFile, "UTF-8");
-      int quotaUS = Integer.parseInt(contents.trim());
-      if (quotaUS != -1) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @VisibleForTesting
   int[] getOverallLimits(float yarnProcessors) {
-
-    int[] ret = new int[2];
-
-    if (yarnProcessors < 0.01f) {
-      throw new IllegalArgumentException("Number of processors can't be <= 0.");
-    }
-
-    int quotaUS = MAX_QUOTA_US;
-    int periodUS = (int) (MAX_QUOTA_US / yarnProcessors);
-    if (yarnProcessors < 1.0f) {
-      periodUS = MAX_QUOTA_US;
-      quotaUS = (int) (periodUS * yarnProcessors);
-      if (quotaUS < MIN_PERIOD_US) {
-        LOG
-          .warn("The quota calculated for the cgroup was too low. The minimum value is "
-              + MIN_PERIOD_US + ", calculated value is " + quotaUS
-              + ". Setting quota to minimum value.");
-        quotaUS = MIN_PERIOD_US;
-      }
-    }
-
-    // cfs_period_us can't be less than 1000 microseconds
-    // if the value of periodUS is less than 1000, we can't really use cgroups
-    // to limit cpu
-    if (periodUS < MIN_PERIOD_US) {
-      LOG
-        .warn("The period calculated for the cgroup was too low. The minimum value is "
-            + MIN_PERIOD_US + ", calculated value is " + periodUS
-            + ". Using all available CPU.");
-      periodUS = MAX_QUOTA_US;
-      quotaUS = -1;
-    }
-
-    ret[0] = periodUS;
-    ret[1] = quotaUS;
-    return ret;
+    return CGroupsCpuResourceHandlerImpl.getOverallLimits(yarnProcessors);
   }
 
+
   boolean isCpuWeightEnabled() {
     return this.cpuWeightEnabled;
   }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/DefaultLCEResourcesHandler.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
 
+@Deprecated
 public class DefaultLCEResourcesHandler implements LCEResourcesHandler {
 
   final static Log LOG = LogFactory

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/LCEResourcesHandler.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
 
+@Deprecated
 public interface LCEResourcesHandler extends Configurable {
 
   void init(LinuxContainerExecutor lce) throws IOException;

+ 297 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsCpuResourceHandlerImpl.java

@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.List;
+
+import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.times;
+
+public class TestCGroupsCpuResourceHandlerImpl {
+
+  private CGroupsHandler mockCGroupsHandler;
+  private CGroupsCpuResourceHandlerImpl cGroupsCpuResourceHandler;
+  private ResourceCalculatorPlugin plugin;
+  final int numProcessors = 4;
+
+  @Before
+  public void setup() {
+    mockCGroupsHandler = mock(CGroupsHandler.class);
+    cGroupsCpuResourceHandler =
+        new CGroupsCpuResourceHandlerImpl(mockCGroupsHandler);
+
+    plugin = mock(ResourceCalculatorPlugin.class);
+    Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
+    Mockito.doReturn(numProcessors).when(plugin).getNumCores();
+  }
+
+  @Test
+  public void testBootstrap() throws Exception {
+    Configuration conf = new YarnConfiguration();
+
+    List<PrivilegedOperation> ret =
+        cGroupsCpuResourceHandler.bootstrap(plugin, conf);
+    verify(mockCGroupsHandler, times(1))
+        .mountCGroupController(CGroupsHandler.CGroupController.CPU);
+    verify(mockCGroupsHandler, times(0))
+        .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
+            CGroupsHandler.CGROUP_CPU_PERIOD_US, "");
+    verify(mockCGroupsHandler, times(0))
+        .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
+            CGroupsHandler.CGROUP_CPU_QUOTA_US, "");
+    Assert.assertNull(ret);
+  }
+
+  @Test
+  public void testBootstrapLimits() throws Exception {
+    Configuration conf = new YarnConfiguration();
+
+    int cpuPerc = 80;
+    conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
+        cpuPerc);
+    int period = (CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US * 100) / (cpuPerc
+        * numProcessors);
+    List<PrivilegedOperation> ret =
+        cGroupsCpuResourceHandler.bootstrap(plugin, conf);
+    verify(mockCGroupsHandler, times(1))
+        .mountCGroupController(CGroupsHandler.CGroupController.CPU);
+    verify(mockCGroupsHandler, times(1))
+        .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
+            CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(period));
+    verify(mockCGroupsHandler, times(1))
+        .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
+            CGroupsHandler.CGROUP_CPU_QUOTA_US,
+            String.valueOf(CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US));
+    Assert.assertNull(ret);
+  }
+
+  @Test
+  public void testBootstrapExistingLimits() throws Exception {
+    File existingLimit = new File(CGroupsHandler.CGroupController.CPU.getName()
+        + "." + CGroupsHandler.CGROUP_CPU_QUOTA_US);
+    try {
+      FileUtils.write(existingLimit, "10000"); // value doesn't matter
+      when(mockCGroupsHandler
+          .getPathForCGroup(CGroupsHandler.CGroupController.CPU, ""))
+          .thenReturn(".");
+      Configuration conf = new YarnConfiguration();
+
+      List<PrivilegedOperation> ret =
+          cGroupsCpuResourceHandler.bootstrap(plugin, conf);
+      verify(mockCGroupsHandler, times(1))
+          .mountCGroupController(CGroupsHandler.CGroupController.CPU);
+      verify(mockCGroupsHandler, times(1))
+          .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
+              CGroupsHandler.CGROUP_CPU_QUOTA_US, "-1");
+      Assert.assertNull(ret);
+    } finally {
+      FileUtils.deleteQuietly(existingLimit);
+    }
+  }
+
+  @Test
+  public void testPreStart() throws Exception {
+    String id = "container_01_01";
+    String path = "test-path/" + id;
+    ContainerId mockContainerId = mock(ContainerId.class);
+    when(mockContainerId.toString()).thenReturn(id);
+    Container mockContainer = mock(Container.class);
+    when(mockContainer.getContainerId()).thenReturn(mockContainerId);
+    when(mockCGroupsHandler
+        .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
+        .thenReturn(path);
+    when(mockContainer.getResource()).thenReturn(Resource.newInstance(1024, 2));
+
+    List<PrivilegedOperation> ret =
+        cGroupsCpuResourceHandler.preStart(mockContainer);
+    verify(mockCGroupsHandler, times(1))
+        .createCGroup(CGroupsHandler.CGroupController.CPU, id);
+    verify(mockCGroupsHandler, times(1))
+        .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
+            CGroupsHandler.CGROUP_CPU_SHARES, String
+                .valueOf(CGroupsCpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT * 2));
+
+    // don't set quota or period
+    verify(mockCGroupsHandler, never())
+        .updateCGroupParam(eq(CGroupsHandler.CGroupController.CPU), eq(id),
+            eq(CGroupsHandler.CGROUP_CPU_PERIOD_US), anyString());
+    verify(mockCGroupsHandler, never())
+        .updateCGroupParam(eq(CGroupsHandler.CGroupController.CPU), eq(id),
+            eq(CGroupsHandler.CGROUP_CPU_QUOTA_US), anyString());
+    Assert.assertNotNull(ret);
+    Assert.assertEquals(1, ret.size());
+    PrivilegedOperation op = ret.get(0);
+    Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+        op.getOperationType());
+    List<String> args = op.getArguments();
+    Assert.assertEquals(1, args.size());
+    Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path,
+        args.get(0));
+  }
+
+  @Test
+  public void testPreStartStrictUsage() throws Exception {
+    String id = "container_01_01";
+    String path = "test-path/" + id;
+    ContainerId mockContainerId = mock(ContainerId.class);
+    when(mockContainerId.toString()).thenReturn(id);
+    Container mockContainer = mock(Container.class);
+    when(mockContainer.getContainerId()).thenReturn(mockContainerId);
+    when(mockCGroupsHandler
+        .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
+        .thenReturn(path);
+    when(mockContainer.getResource()).thenReturn(Resource.newInstance(1024, 1));
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(
+        YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
+        true);
+    cGroupsCpuResourceHandler.bootstrap(plugin, conf);
+    int defaultVCores = 8;
+    float share = (float) numProcessors / (float) defaultVCores;
+    List<PrivilegedOperation> ret =
+        cGroupsCpuResourceHandler.preStart(mockContainer);
+    verify(mockCGroupsHandler, times(1))
+        .createCGroup(CGroupsHandler.CGroupController.CPU, id);
+    verify(mockCGroupsHandler, times(1))
+        .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
+            CGroupsHandler.CGROUP_CPU_SHARES,
+            String.valueOf(CGroupsCpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT));
+    // set quota and period
+    verify(mockCGroupsHandler, times(1))
+        .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
+            CGroupsHandler.CGROUP_CPU_PERIOD_US,
+            String.valueOf(CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US));
+    verify(mockCGroupsHandler, times(1))
+        .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
+            CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(
+                (int) (CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US * share)));
+    Assert.assertNotNull(ret);
+    Assert.assertEquals(1, ret.size());
+    PrivilegedOperation op = ret.get(0);
+    Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+        op.getOperationType());
+    List<String> args = op.getArguments();
+    Assert.assertEquals(1, args.size());
+    Assert.assertEquals(PrivilegedOperation.CGROUP_ARG_PREFIX + path,
+        args.get(0));
+  }
+
+  @Test
+  public void testPreStartRestrictedContainers() throws Exception {
+    String id = "container_01_01";
+    String path = "test-path/" + id;
+    int defaultVCores = 8;
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(
+        YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
+        true);
+    int cpuPerc = 75;
+    conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
+        cpuPerc);
+    cGroupsCpuResourceHandler.bootstrap(plugin, conf);
+    verify(mockCGroupsHandler, times(1))
+        .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
+            CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf("333333"));
+    verify(mockCGroupsHandler, times(1))
+        .updateCGroupParam(CGroupsHandler.CGroupController.CPU, "",
+            CGroupsHandler.CGROUP_CPU_QUOTA_US,
+            String.valueOf(CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US));
+    float yarnCores = (cpuPerc * numProcessors) / 100;
+    int[] containerVCores = { 2, 4 };
+    for (int cVcores : containerVCores) {
+      ContainerId mockContainerId = mock(ContainerId.class);
+      when(mockContainerId.toString()).thenReturn(id);
+      Container mockContainer = mock(Container.class);
+      when(mockContainer.getContainerId()).thenReturn(mockContainerId);
+      when(mockCGroupsHandler
+          .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
+          .thenReturn(path);
+      when(mockContainer.getResource())
+          .thenReturn(Resource.newInstance(1024, cVcores));
+      when(mockCGroupsHandler
+          .getPathForCGroupTasks(CGroupsHandler.CGroupController.CPU, id))
+          .thenReturn(path);
+
+      float share = (cVcores * yarnCores) / defaultVCores;
+      int quotaUS;
+      int periodUS;
+      if (share > 1.0f) {
+        quotaUS = CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US;
+        periodUS =
+            (int) ((float) CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US / share);
+      } else {
+        quotaUS = (int) (CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US * share);
+        periodUS = CGroupsCpuResourceHandlerImpl.MAX_QUOTA_US;
+      }
+      cGroupsCpuResourceHandler.preStart(mockContainer);
+      verify(mockCGroupsHandler, times(1))
+          .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
+              CGroupsHandler.CGROUP_CPU_SHARES, String.valueOf(
+              CGroupsCpuResourceHandlerImpl.CPU_DEFAULT_WEIGHT * cVcores));
+      // set quota and period
+      verify(mockCGroupsHandler, times(1))
+          .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
+              CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(periodUS));
+      verify(mockCGroupsHandler, times(1))
+          .updateCGroupParam(CGroupsHandler.CGroupController.CPU, id,
+              CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(quotaUS));
+    }
+  }
+
+  @Test
+  public void testReacquireContainer() throws Exception {
+    ContainerId containerIdMock = mock(ContainerId.class);
+    Assert.assertNull(
+        cGroupsCpuResourceHandler.reacquireContainer(containerIdMock));
+  }
+
+  @Test
+  public void testPostComplete() throws Exception {
+    String id = "container_01_01";
+    ContainerId mockContainerId = mock(ContainerId.class);
+    when(mockContainerId.toString()).thenReturn(id);
+    Assert.assertNull(cGroupsCpuResourceHandler.postComplete(mockContainerId));
+    verify(mockCGroupsHandler, times(1))
+        .deleteCGroup(CGroupsHandler.CGroupController.CPU, id);
+  }
+
+  @Test
+  public void testTeardown() throws Exception {
+    Assert.assertNull(cGroupsCpuResourceHandler.teardown());
+  }
+
+  @Test
+  public void testStrictResourceUsage() throws Exception {
+    Assert.assertNull(cGroupsCpuResourceHandler.teardown());
+  }
+}

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java

@@ -37,6 +37,7 @@ import java.util.List;
 import java.util.Scanner;
 import java.util.concurrent.CountDownLatch;
 
+@Deprecated
 public class TestCgroupsLCEResourcesHandler {
   static File cgroupDir = null;