Browse Source

YARN-3443. Create a 'ResourceHandler' subsystem to ease addition of support for new resource types on the NM. Contributed by Sidharta Seethana.

Junping Du 10 years ago
parent
commit
838b06ac87
11 changed files with 1736 additions and 0 deletions
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 119 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java
  3. 43 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationException.java
  4. 255 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java
  5. 132 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java
  6. 436 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java
  7. 91 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandler.java
  8. 142 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerChain.java
  9. 47 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerException.java
  10. 233 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/TestPrivilegedOperationExecutor.java
  11. 235 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsHandlerImpl.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -69,6 +69,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3347. Improve YARN log command to get AMContainer logs as well as 
     YARN-3347. Improve YARN log command to get AMContainer logs as well as 
     running containers logs. (Xuan Gong via junping_du)
     running containers logs. (Xuan Gong via junping_du)
 
 
+    YARN-3443. Create a 'ResourceHandler' subsystem to ease addition of support 
+    for new resource types on the NM. (Sidharta Seethana via junping_du)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

+ 119 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperation.java

@@ -0,0 +1,119 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Represents operations that require higher system privileges - e.g
+ * creating cgroups, launching containers as specified users, 'tc' commands etc
+ * that are completed using the container-executor binary
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class PrivilegedOperation {
+
+  public enum OperationType {
+    CHECK_SETUP("--checksetup"),
+    MOUNT_CGROUPS("--mount-cgroups"),
+    INITIALIZE_CONTAINER(""), //no CLI switch supported yet
+    LAUNCH_CONTAINER(""), //no CLI switch supported yet
+    SIGNAL_CONTAINER(""), //no CLI switch supported yet
+    DELETE_AS_USER(""), //no CLI switch supported yet
+    TC_MODIFY_STATE("--tc-modify-state"),
+    TC_READ_STATE("--tc-read-state"),
+    TC_READ_STATS("--tc-read-stats"),
+    ADD_PID_TO_CGROUP(""); //no CLI switch supported yet.
+
+    private final String option;
+
+    OperationType(String option) {
+      this.option = option;
+    }
+
+    public String getOption() {
+      return option;
+    }
+  }
+
+  public static final String CGROUP_ARG_PREFIX = "cgroups=";
+
+  private final OperationType opType;
+  private final List<String> args;
+
+  public PrivilegedOperation(OperationType opType, String arg) {
+    this.opType = opType;
+    this.args = new ArrayList<String>();
+
+    if (arg != null) {
+      this.args.add(arg);
+    }
+  }
+
+  public PrivilegedOperation(OperationType opType, List<String> args) {
+    this.opType = opType;
+    this.args = new ArrayList<String>();
+
+    if (args != null) {
+      this.args.addAll(args);
+    }
+  }
+
+  public void appendArgs(String... args) {
+    for (String arg : args) {
+      this.args.add(arg);
+    }
+  }
+
+  public void appendArgs(List<String> args) {
+    this.args.addAll(args);
+  }
+
+  public OperationType getOperationType() {
+    return opType;
+  }
+
+  public List<String> getArguments() {
+    return Collections.unmodifiableList(this.args);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof PrivilegedOperation)) {
+      return false;
+    }
+
+    PrivilegedOperation otherOp = (PrivilegedOperation) other;
+
+    return otherOp.opType.equals(opType) && otherOp.args.equals(args);
+  }
+
+  @Override
+  public int hashCode() {
+    return opType.hashCode() + 97 * args.hashCode();
+  }
+}

+ 43 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationException.java

@@ -0,0 +1,43 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class PrivilegedOperationException extends YarnException {
+  private static final long serialVersionUID = 1L;
+
+  public PrivilegedOperationException() {
+    super();
+  }
+
+  public PrivilegedOperationException(String message) {
+    super(message);
+  }
+
+  public PrivilegedOperationException(Throwable cause) {
+    super(cause);
+  }
+
+  public PrivilegedOperationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

+ 255 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/PrivilegedOperationExecutor.java

@@ -0,0 +1,255 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * provides mechanisms to execute PrivilegedContainerOperations *
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class PrivilegedOperationExecutor {
+  private static final Log LOG = LogFactory.getLog(PrivilegedOperationExecutor
+      .class);
+  private volatile static PrivilegedOperationExecutor instance;
+
+  private String containerExecutorExe;
+
+  public static String getContainerExecutorExecutablePath(Configuration conf) {
+    String yarnHomeEnvVar =
+        System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key());
+    File hadoopBin = new File(yarnHomeEnvVar, "bin");
+    String defaultPath =
+        new File(hadoopBin, "container-executor").getAbsolutePath();
+    return null == conf
+        ? defaultPath
+        : conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH,
+        defaultPath);
+  }
+
+  private void init(Configuration conf) {
+    containerExecutorExe = getContainerExecutorExecutablePath(conf);
+  }
+
+  private PrivilegedOperationExecutor(Configuration conf) {
+    init(conf);
+  }
+
+  public static PrivilegedOperationExecutor getInstance(Configuration conf) {
+    if (instance == null) {
+      synchronized (PrivilegedOperationExecutor.class) {
+        if (instance == null) {
+          instance = new PrivilegedOperationExecutor(conf);
+        }
+      }
+    }
+
+    return instance;
+  }
+
+  /**
+   * @param prefixCommands in some cases ( e.g priorities using nice ),
+   *                       prefix commands are necessary
+   * @param operation      the type and arguments for the operation to be
+   *                       executed
+   * @return execution string array for priviledged operation
+   */
+
+  public String[] getPrivilegedOperationExecutionCommand(List<String>
+      prefixCommands,
+      PrivilegedOperation operation) {
+    List<String> fullCommand = new ArrayList<String>();
+
+    if (prefixCommands != null && !prefixCommands.isEmpty()) {
+      fullCommand.addAll(prefixCommands);
+    }
+
+    fullCommand.add(containerExecutorExe);
+    fullCommand.add(operation.getOperationType().getOption());
+    fullCommand.addAll(operation.getArguments());
+
+    String[] fullCommandArray =
+        fullCommand.toArray(new String[fullCommand.size()]);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Privileged Execution Command Array: " +
+          Arrays.toString(fullCommandArray));
+    }
+
+    return fullCommandArray;
+  }
+
+  /**
+   * Executes a privileged operation. It is up to the callers to ensure that
+   * each privileged operation's parameters are constructed correctly. The
+   * parameters are passed verbatim to the container-executor binary.
+   *
+   * @param prefixCommands in some cases ( e.g priorities using nice ),
+   *                       prefix commands are necessary
+   * @param operation      the type and arguments for the operation to be executed
+   * @param workingDir     (optional) working directory for execution
+   * @param env            (optional) env of the command will include specified vars
+   * @param grabOutput     return (possibly large) shell command output
+   * @return stdout contents from shell executor - useful for some privileged
+   * operations - e.g --tc_read
+   * @throws org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException
+   */
+  public String executePrivilegedOperation(List<String> prefixCommands,
+      PrivilegedOperation operation, File workingDir,
+      Map<String, String> env, boolean grabOutput)
+      throws PrivilegedOperationException {
+    String[] fullCommandArray = getPrivilegedOperationExecutionCommand
+        (prefixCommands, operation);
+    ShellCommandExecutor exec = new ShellCommandExecutor(fullCommandArray,
+        workingDir, env);
+
+    try {
+      exec.execute();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Privileged Execution Operation Output:");
+        LOG.debug(exec.getOutput());
+      }
+    } catch (ExitCodeException e) {
+      String logLine = new StringBuffer("Shell execution returned exit code: ")
+          .append(exec.getExitCode())
+          .append(". Privileged Execution Operation Output: ")
+          .append(System.lineSeparator()).append(exec.getOutput()).toString();
+
+      LOG.warn(logLine);
+      throw new PrivilegedOperationException(e);
+    } catch (IOException e) {
+      LOG.warn("IOException executing command: ", e);
+      throw new PrivilegedOperationException(e);
+    }
+
+    if (grabOutput) {
+      return exec.getOutput();
+    }
+
+    return null;
+  }
+
+  /**
+   * Executes a privileged operation. It is up to the callers to ensure that
+   * each privileged operation's parameters are constructed correctly. The
+   * parameters are passed verbatim to the container-executor binary.
+   *
+   * @param operation  the type and arguments for the operation to be executed
+   * @param grabOutput return (possibly large) shell command output
+   * @return stdout contents from shell executor - useful for some privileged
+   * operations - e.g --tc_read
+   * @throws org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException
+   */
+  public String executePrivilegedOperation(PrivilegedOperation operation,
+      boolean grabOutput) throws PrivilegedOperationException {
+    return executePrivilegedOperation(null, operation, null, null, grabOutput);
+  }
+
+  //Utility functions for squashing together operations in supported ways
+  //At some point, we need to create a generalized mechanism that uses a set
+  //of squashing 'rules' to squash an set of PrivilegedOperations of varying
+  //types - e.g Launch Container + Add Pid to CGroup(s) + TC rules
+
+  /**
+   * Squash operations for cgroups - e.g mount, add pid to cgroup etc .,
+   * For now, we only implement squashing for 'add pid to cgroup' since this
+   * is the only optimization relevant to launching containers
+   *
+   * @return single squashed cgroup operation. Null on failure.
+   */
+
+  public static PrivilegedOperation squashCGroupOperations
+  (List<PrivilegedOperation> ops) throws PrivilegedOperationException {
+    if (ops.size() == 0) {
+      return null;
+    }
+
+    StringBuffer finalOpArg = new StringBuffer(PrivilegedOperation
+        .CGROUP_ARG_PREFIX);
+    boolean noneArgsOnly = true;
+
+    for (PrivilegedOperation op : ops) {
+      if (!op.getOperationType()
+          .equals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP)) {
+        LOG.warn("Unsupported operation type: " + op.getOperationType());
+        throw new PrivilegedOperationException("Unsupported operation type:"
+            + op.getOperationType());
+      }
+
+      List<String> args = op.getArguments();
+      if (args.size() != 1) {
+        LOG.warn("Invalid number of args: " + args.size());
+        throw new PrivilegedOperationException("Invalid number of args: "
+            + args.size());
+      }
+
+      String arg = args.get(0);
+      String tasksFile = StringUtils.substringAfter(arg,
+          PrivilegedOperation.CGROUP_ARG_PREFIX);
+      if (tasksFile == null || tasksFile.isEmpty()) {
+        LOG.warn("Invalid argument: " + arg);
+        throw new PrivilegedOperationException("Invalid argument: " + arg);
+      }
+
+      if (tasksFile.equals("none")) {
+        //Don't append to finalOpArg
+        continue;
+      }
+
+      if (noneArgsOnly == false) {
+        //We have already appended at least one tasks file.
+        finalOpArg.append(",");
+        finalOpArg.append(tasksFile);
+      } else {
+        finalOpArg.append(tasksFile);
+        noneArgsOnly = false;
+      }
+    }
+
+    if (noneArgsOnly) {
+      finalOpArg.append("none"); //there were no tasks file to append
+    }
+
+    PrivilegedOperation finalOp = new PrivilegedOperation(
+        PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, finalOpArg
+        .toString());
+
+    return finalOp;
+  }
+}

+ 132 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java

@@ -0,0 +1,132 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Provides CGroups functionality. Implementations are expected to be
+ * thread-safe
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface CGroupsHandler {
+  public enum CGroupController {
+    CPU("cpu"),
+    NET_CLS("net_cls");
+
+    private final String name;
+
+    CGroupController(String name) {
+      this.name = name;
+    }
+
+    String getName() {
+      return name;
+    }
+  }
+
+  public static final String CGROUP_FILE_TASKS = "tasks";
+  public static final String CGROUP_PARAM_CLASSID = "classid";
+
+  /**
+   * Mounts a cgroup controller
+   * @param controller - the controller being mounted
+   * @throws ResourceHandlerException
+   */
+  public void mountCGroupController(CGroupController controller)
+      throws ResourceHandlerException;
+
+  /**
+   * Creates a cgroup for a given controller
+   * @param controller - controller type for which the cgroup is being created
+   * @param cGroupId - id of the cgroup being created
+   * @return full path to created cgroup
+   * @throws ResourceHandlerException
+   */
+  public String createCGroup(CGroupController controller, String cGroupId)
+      throws ResourceHandlerException;
+
+  /**
+   * Deletes the specified cgroup
+   * @param controller - controller type for the cgroup
+   * @param cGroupId - id of the cgroup being deleted
+   * @throws ResourceHandlerException
+   */
+  public void deleteCGroup(CGroupController controller, String cGroupId) throws
+      ResourceHandlerException;
+
+  /**
+   * Gets the full path for the cgroup, given a controller and a cgroup id
+   * @param controller - controller type for the cgroup
+   * @param cGroupId - id of the cgroup
+   * @return full path for the cgroup
+   */
+  public String getPathForCGroup(CGroupController controller, String
+      cGroupId);
+
+  /**
+   * Gets the full path for the cgroup's tasks file, given a controller and a
+   * cgroup id
+   * @param controller - controller type for the cgroup
+   * @param cGroupId - id of the cgroup
+   * @return full path for the cgroup's tasks file
+   */
+  public String getPathForCGroupTasks(CGroupController controller, String
+      cGroupId);
+
+  /**
+   * Gets the full path for a cgroup parameter, given a controller,
+   * cgroup id and parameter name
+   * @param controller - controller type for the cgroup
+   * @param cGroupId - id of the cgroup
+   * @param param - cgroup parameter ( e.g classid )
+   * @return full path for the cgroup parameter
+   */
+  public String getPathForCGroupParam(CGroupController controller, String
+      cGroupId, String param);
+
+  /**
+   * updates a cgroup parameter, given a controller, cgroup id, parameter name
+   * and a parameter value
+   * @param controller - controller type for the cgroup
+   * @param cGroupId - id of the cgroup
+   * @param param - cgroup parameter ( e.g classid )
+   * @param value - value to be written to the parameter file
+   * @throws ResourceHandlerException
+   */
+  public void updateCGroupParam(CGroupController controller, String cGroupId,
+      String param, String value) throws ResourceHandlerException;
+
+  /**
+   * reads a cgroup parameter value, given a controller, cgroup id, parameter
+   * name
+   * @param controller - controller type for the cgroup
+   * @param cGroupId - id of the cgroup
+   * @param param - cgroup parameter ( e.g classid )
+   * @return parameter value as read from the parameter file
+   * @throws ResourceHandlerException
+   */
+  public String getCGroupParam(CGroupController controller, String cGroupId,
+      String param) throws ResourceHandlerException;
+}

+ 436 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java

@@ -0,0 +1,436 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Support for interacting with various CGroup subsystems. Thread-safe.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class CGroupsHandlerImpl implements CGroupsHandler {
+
+  private static final Log LOG = LogFactory.getLog(CGroupsHandlerImpl.class);
+  private static final String MTAB_FILE = "/proc/mounts";
+  private static final String CGROUPS_FSTYPE = "cgroup";
+
+  private final String cGroupPrefix;
+  private final boolean enableCGroupMount;
+  private final String cGroupMountPath;
+  private final long deleteCGroupTimeout;
+  private final long deleteCGroupDelay;
+  private final Map<CGroupController, String> controllerPaths;
+  private final ReadWriteLock rwLock;
+  private final PrivilegedOperationExecutor privilegedOperationExecutor;
+  private final Clock clock;
+
+  public CGroupsHandlerImpl(Configuration conf, PrivilegedOperationExecutor
+      privilegedOperationExecutor) throws ResourceHandlerException {
+    this.cGroupPrefix = conf.get(YarnConfiguration.
+        NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, "/hadoop-yarn")
+        .replaceAll("^/", "").replaceAll("$/", "");
+    this.enableCGroupMount = conf.getBoolean(YarnConfiguration.
+        NM_LINUX_CONTAINER_CGROUPS_MOUNT, false);
+    this.cGroupMountPath = conf.get(YarnConfiguration.
+        NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH, null);
+    this.deleteCGroupTimeout = conf.getLong(
+        YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT,
+        YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT);
+    this.deleteCGroupDelay =
+        conf.getLong(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY,
+            YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY);
+    this.controllerPaths = new HashMap<>();
+    this.rwLock = new ReentrantReadWriteLock();
+    this.privilegedOperationExecutor = privilegedOperationExecutor;
+    this.clock = new SystemClock();
+
+    init();
+  }
+
+  private void init() throws ResourceHandlerException {
+    initializeControllerPaths();
+  }
+
+  private String getControllerPath(CGroupController controller) {
+    try {
+      rwLock.readLock().lock();
+      return controllerPaths.get(controller);
+    } finally {
+      rwLock.readLock().unlock();
+    }
+  }
+
+  private void initializeControllerPaths() throws ResourceHandlerException {
+    if (enableCGroupMount) {
+      //nothing to do here - we support 'deferred' mounting of specific
+      //controllers - we'll populate the path for a given controller when an
+      //explicit mountCGroupController request is issued.
+      LOG.info("CGroup controller mounting enabled.");
+    } else {
+      //cluster admins are expected to have mounted controllers in specific
+      //locations - we'll attempt to figure out mount points
+      initializeControllerPathsFromMtab();
+    }
+  }
+
+  private void initializeControllerPathsFromMtab()
+      throws ResourceHandlerException {
+    try {
+      Map<String, List<String>> parsedMtab = parseMtab();
+
+      //we want to do a bulk update without the paths changing concurrently
+      rwLock.writeLock().lock();
+
+      for (CGroupController controller : CGroupController.values()) {
+        String name = controller.getName();
+        String controllerPath = findControllerInMtab(name, parsedMtab);
+
+        if (controllerPath != null) {
+          File f = new File(controllerPath + "/" + this.cGroupPrefix);
+
+          if (FileUtil.canWrite(f)) {
+            controllerPaths.put(controller, controllerPath);
+          } else {
+            String error =
+                new StringBuffer("Mount point Based on mtab file: ")
+                    .append(MTAB_FILE).append(
+                    ". Controller mount point not writable for: ")
+                    .append(name).toString();
+
+            LOG.error(error);
+            throw new ResourceHandlerException(error);
+          }
+        } else {
+
+            LOG.warn("Controller not mounted but automount disabled: " + name);
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to initialize controller paths! Exception: " + e);
+      throw new ResourceHandlerException(
+          "Failed to initialize controller paths!");
+    } finally {
+      rwLock.writeLock().unlock();
+    }
+  }
+
+  /* We are looking for entries of the form:
+   * none /cgroup/path/mem cgroup rw,memory 0 0
+   *
+   * Use a simple pattern that splits on the five spaces, and
+   * grabs the 2, 3, and 4th fields.
+   */
+
+  private static final Pattern MTAB_FILE_FORMAT = Pattern.compile(
+      "^[^\\s]+\\s([^\\s]+)\\s([^\\s]+)\\s([^\\s]+)\\s[^\\s]+\\s[^\\s]+$");
+
+  /*
+   * Returns a map: path -> mount options
+   * for mounts with type "cgroup". Cgroup controllers will
+   * appear in the list of options for a path.
+   */
+  private Map<String, List<String>> parseMtab() throws IOException {
+    Map<String, List<String>> ret = new HashMap<String, List<String>>();
+    BufferedReader in = null;
+
+    try {
+      FileInputStream fis = new FileInputStream(new File(getMtabFileName()));
+      in = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
+
+      for (String str = in.readLine(); str != null;
+           str = in.readLine()) {
+        Matcher m = MTAB_FILE_FORMAT.matcher(str);
+        boolean mat = m.find();
+        if (mat) {
+          String path = m.group(1);
+          String type = m.group(2);
+          String options = m.group(3);
+
+          if (type.equals(CGROUPS_FSTYPE)) {
+            List<String> value = Arrays.asList(options.split(","));
+            ret.put(path, value);
+          }
+        }
+      }
+    } catch (IOException e) {
+      throw new IOException("Error while reading " + getMtabFileName(), e);
+    } finally {
+      IOUtils.cleanup(LOG, in);
+    }
+
+    return ret;
+  }
+
+  private String findControllerInMtab(String controller,
+      Map<String, List<String>> entries) {
+    for (Map.Entry<String, List<String>> e : entries.entrySet()) {
+      if (e.getValue().contains(controller))
+        return e.getKey();
+    }
+
+    return null;
+  }
+
+  String getMtabFileName() {
+    return MTAB_FILE;
+  }
+
+  @Override
+  public void mountCGroupController(CGroupController controller)
+      throws ResourceHandlerException {
+    if (!enableCGroupMount) {
+      LOG.warn("CGroup mounting is disabled - ignoring mount request for: " +
+          controller.getName());
+      return;
+    }
+
+    String path = getControllerPath(controller);
+
+    if (path == null) {
+      try {
+        //lock out other readers/writers till we are done
+        rwLock.writeLock().lock();
+
+        String hierarchy = cGroupPrefix;
+        StringBuffer controllerPath = new StringBuffer()
+            .append(cGroupMountPath).append('/').append(controller.getName());
+        StringBuffer cGroupKV = new StringBuffer()
+            .append(controller.getName()).append('=').append(controllerPath);
+        PrivilegedOperation.OperationType opType = PrivilegedOperation
+            .OperationType.MOUNT_CGROUPS;
+        PrivilegedOperation op = new PrivilegedOperation(opType, (String) null);
+
+        op.appendArgs(hierarchy, cGroupKV.toString());
+        LOG.info("Mounting controller " + controller.getName() + " at " +
+              controllerPath);
+        privilegedOperationExecutor.executePrivilegedOperation(op, false);
+
+        //if privileged operation succeeds, update controller paths
+        controllerPaths.put(controller, controllerPath.toString());
+
+        return;
+      } catch (PrivilegedOperationException e) {
+        LOG.error("Failed to mount controller: " + controller.getName());
+        throw new ResourceHandlerException("Failed to mount controller: "
+            + controller.getName());
+      } finally {
+        rwLock.writeLock().unlock();
+      }
+    } else {
+      LOG.info("CGroup controller already mounted at: " + path);
+      return;
+    }
+  }
+
+  @Override
+  public String getPathForCGroup(CGroupController controller, String cGroupId) {
+    return new StringBuffer(getControllerPath(controller))
+        .append('/').append(cGroupPrefix).append("/")
+        .append(cGroupId).toString();
+  }
+
+  @Override
+  public String getPathForCGroupTasks(CGroupController controller,
+      String cGroupId) {
+    return new StringBuffer(getPathForCGroup(controller, cGroupId))
+        .append('/').append(CGROUP_FILE_TASKS).toString();
+  }
+
+  @Override
+  public String getPathForCGroupParam(CGroupController controller,
+      String cGroupId, String param) {
+    return new StringBuffer(getPathForCGroup(controller, cGroupId))
+        .append('/').append(controller.getName()).append('.')
+        .append(param).toString();
+  }
+
+  @Override
+  public String createCGroup(CGroupController controller, String cGroupId)
+      throws ResourceHandlerException {
+    String path = getPathForCGroup(controller, cGroupId);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("createCgroup: " + path);
+    }
+
+    if (!new File(path).mkdir()) {
+      throw new ResourceHandlerException("Failed to create cgroup at " + path);
+    }
+
+    return path;
+  }
+
+  /*
+  * Utility routine to print first line from cgroup tasks file
+  */
+  private void logLineFromTasksFile(File cgf) {
+    String str;
+    if (LOG.isDebugEnabled()) {
+      try (BufferedReader inl =
+          new BufferedReader(new InputStreamReader(new FileInputStream(cgf
+              + "/tasks"), "UTF-8"))) {
+        if ((str = inl.readLine()) != null) {
+          LOG.debug("First line in cgroup tasks file: " + cgf + " " + str);
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to read cgroup tasks file. ", e);
+      }
+    }
+  }
+
+  /**
+   * If tasks file is empty, delete the cgroup.
+   *
+   * @param cgf object referring to the cgroup to be deleted
+   * @return Boolean indicating whether cgroup was deleted
+   */
+  boolean checkAndDeleteCgroup(File cgf) throws InterruptedException {
+    boolean deleted = false;
+    // FileInputStream in = null;
+    try (FileInputStream in = new FileInputStream(cgf + "/tasks")) {
+      if (in.read() == -1) {
+        /*
+         * "tasks" file is empty, sleep a bit more and then try to delete the
+         * cgroup. Some versions of linux will occasionally panic due to a race
+         * condition in this area, hence the paranoia.
+         */
+        Thread.sleep(deleteCGroupDelay);
+        deleted = cgf.delete();
+        if (!deleted) {
+          LOG.warn("Failed attempt to delete cgroup: " + cgf);
+        }
+      } else {
+        logLineFromTasksFile(cgf);
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to read cgroup tasks file. ", e);
+    }
+    return deleted;
+  }
+
+  @Override
+  public void deleteCGroup(CGroupController controller, String cGroupId)
+      throws ResourceHandlerException {
+    boolean deleted = false;
+    String cGroupPath = getPathForCGroup(controller, cGroupId);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("deleteCGroup: " + cGroupPath);
+    }
+
+    long start = clock.getTime();
+
+    do {
+      try {
+        deleted = checkAndDeleteCgroup(new File(cGroupPath));
+        if (!deleted) {
+          Thread.sleep(deleteCGroupDelay);
+        }
+      } catch (InterruptedException ex) {
+        // NOP
+      }
+    } while (!deleted && (clock.getTime() - start) < deleteCGroupTimeout);
+
+    if (!deleted) {
+      LOG.warn("Unable to delete  " + cGroupPath +
+          ", tried to delete for " + deleteCGroupTimeout + "ms");
+    }
+  }
+
+  @Override
+  public void updateCGroupParam(CGroupController controller, String cGroupId,
+      String param, String value) throws ResourceHandlerException {
+    String cGroupParamPath = getPathForCGroupParam(controller, cGroupId, param);
+    PrintWriter pw = null;
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "updateCGroupParam for path: " + cGroupParamPath + " with value " +
+              value);
+    }
+
+    try {
+      File file = new File(cGroupParamPath);
+      Writer w = new OutputStreamWriter(new FileOutputStream(file), "UTF-8");
+      pw = new PrintWriter(w);
+      pw.write(value);
+    } catch (IOException e) {
+      throw new ResourceHandlerException(new StringBuffer("Unable to write to ")
+          .append(cGroupParamPath).append(" with value: ").append(value)
+          .toString(), e);
+    } finally {
+      if (pw != null) {
+        boolean hasError = pw.checkError();
+        pw.close();
+        if (hasError) {
+          throw new ResourceHandlerException(
+              new StringBuffer("Unable to write to ")
+                  .append(cGroupParamPath).append(" with value: ").append(value)
+                  .toString());
+        }
+        if (pw.checkError()) {
+          throw new ResourceHandlerException("Error while closing cgroup file" +
+              " " + cGroupParamPath);
+        }
+      }
+    }
+  }
+
+  @Override
+  public String getCGroupParam(CGroupController controller, String cGroupId,
+      String param) throws ResourceHandlerException {
+    String cGroupParamPath = getPathForCGroupParam(controller, cGroupId, param);
+
+    try {
+      byte[] contents = Files.readAllBytes(Paths.get(cGroupParamPath));
+      return new String(contents, "UTF-8").trim();
+    } catch (IOException e) {
+      throw new ResourceHandlerException(
+          "Unable to read from " + cGroupParamPath);
+    }
+  }
+}

+ 91 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandler.java

@@ -0,0 +1,91 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+
+import java.util.List;
+
+/**
+ * Handler interface for resource subsystems' isolation and enforcement. e.g cpu, memory, network, disks etc
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface ResourceHandler {
+
+  /**
+   * Bootstrap resource susbsystem.
+   *
+   * @return (possibly empty) list of operations that require elevated
+   * privileges
+   */
+  List<PrivilegedOperation> bootstrap(Configuration configuration)
+      throws ResourceHandlerException;
+
+  /**
+   * Prepare a resource environment for container launch
+   *
+   * @param container Container being launched
+   * @return (possibly empty) list of operations that require elevated
+   * privileges e.g a) create a custom cgroup b) add pid for container to tasks
+   * file for a cgroup.
+   * @throws ResourceHandlerException
+   */
+  List<PrivilegedOperation> preStart(Container container)
+      throws ResourceHandlerException;
+
+  /**
+   * Require state for container that was already launched
+   *
+   * @param containerId if of the container being reacquired.
+   * @return (possibly empty) list of operations that require elevated
+   * privileges
+   * @throws ResourceHandlerException
+   */
+
+  List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
+      throws ResourceHandlerException;
+
+  /**
+   * Perform any tasks necessary after container completion
+   * @param containerId of the container that was completed.
+   * @return (possibly empty) list of operations that require elevated
+   * privileges
+   * @throws ResourceHandlerException
+   */
+  List<PrivilegedOperation> postComplete(ContainerId containerId) throws
+      ResourceHandlerException;
+
+  /**
+   * Teardown environment for resource subsystem if requested. This method
+   * needs to be used with care since it could impact running containers.
+   *
+   * @return (possibly empty) list of operations that require elevated
+   * privileges
+   */
+  List<PrivilegedOperation> teardown() throws ResourceHandlerException;
+}

+ 142 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerChain.java

@@ -0,0 +1,142 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A helper class to delegate funcationality to a 'chain' of
+ * ResourceHandler(s)
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ResourceHandlerChain implements ResourceHandler {
+  private final List<ResourceHandler> resourceHandlers;
+
+  public ResourceHandlerChain(List<ResourceHandler> resourceHandlers) {
+    this.resourceHandlers = resourceHandlers;
+  }
+
+  @Override
+  public List<PrivilegedOperation> bootstrap(Configuration configuration)
+      throws ResourceHandlerException {
+
+    List<PrivilegedOperation> allOperations = new
+        ArrayList<PrivilegedOperation>();
+
+    for (ResourceHandler resourceHandler : resourceHandlers) {
+      List<PrivilegedOperation> handlerOperations =
+          resourceHandler.bootstrap(configuration);
+      if (handlerOperations != null) {
+        allOperations.addAll(handlerOperations);
+      }
+
+    }
+    return allOperations;
+  }
+
+  @Override
+  public List<PrivilegedOperation> preStart(Container container)
+      throws ResourceHandlerException {
+    List<PrivilegedOperation> allOperations = new
+        ArrayList<PrivilegedOperation>();
+
+    for (ResourceHandler resourceHandler : resourceHandlers) {
+      List<PrivilegedOperation> handlerOperations =
+          resourceHandler.preStart(container);
+
+      if (handlerOperations != null) {
+        allOperations.addAll(handlerOperations);
+      }
+
+    }
+    return allOperations;
+  }
+
+  @Override
+  public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
+      throws ResourceHandlerException {
+    List<PrivilegedOperation> allOperations = new
+        ArrayList<PrivilegedOperation>();
+
+    for (ResourceHandler resourceHandler : resourceHandlers) {
+      List<PrivilegedOperation> handlerOperations =
+          resourceHandler.reacquireContainer(containerId);
+
+      if (handlerOperations != null) {
+        allOperations.addAll(handlerOperations);
+      }
+
+    }
+    return allOperations;
+  }
+
+  @Override
+  public List<PrivilegedOperation> postComplete(ContainerId containerId)
+      throws ResourceHandlerException {
+    List<PrivilegedOperation> allOperations = new
+        ArrayList<PrivilegedOperation>();
+
+    for (ResourceHandler resourceHandler : resourceHandlers) {
+      List<PrivilegedOperation> handlerOperations =
+          resourceHandler.postComplete(containerId);
+
+      if (handlerOperations != null) {
+        allOperations.addAll(handlerOperations);
+      }
+
+    }
+    return allOperations;
+  }
+
+  @Override
+  public List<PrivilegedOperation> teardown()
+      throws ResourceHandlerException {
+    List<PrivilegedOperation> allOperations = new
+        ArrayList<PrivilegedOperation>();
+
+    for (ResourceHandler resourceHandler : resourceHandlers) {
+      List<PrivilegedOperation> handlerOperations =
+          resourceHandler.teardown();
+
+      if (handlerOperations != null) {
+        allOperations.addAll(handlerOperations);
+      }
+
+    }
+    return allOperations;
+  }
+
+  List<ResourceHandler> getResourceHandlerList() {
+    return Collections.unmodifiableList(resourceHandlers);
+  }
+
+}

+ 47 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerException.java

@@ -0,0 +1,47 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ResourceHandlerException extends YarnException {
+  private static final long serialVersionUID = 1L;
+
+  public ResourceHandlerException() {
+    super();
+  }
+
+  public ResourceHandlerException(String message) {
+    super(message);
+  }
+
+  public ResourceHandlerException(Throwable cause) {
+    super(cause);
+  }
+
+  public ResourceHandlerException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

+ 233 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/privileged/TestPrivilegedOperationExecutor.java

@@ -0,0 +1,233 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestPrivilegedOperationExecutor {
+  private static final Log LOG = LogFactory
+      .getLog(TestPrivilegedOperationExecutor.class);
+  private String localDataDir;
+  private String customExecutorPath;
+  private Configuration nullConf = null;
+  private Configuration emptyConf;
+  private Configuration confWithExecutorPath;
+
+  private String cGroupTasksNone;
+  private String cGroupTasksInvalid;
+  private String cGroupTasks1;
+  private String cGroupTasks2;
+  private String cGroupTasks3;
+  private PrivilegedOperation opDisallowed;
+  private PrivilegedOperation opTasksNone;
+  private PrivilegedOperation opTasksInvalid;
+  private PrivilegedOperation opTasks1;
+  private PrivilegedOperation opTasks2;
+  private PrivilegedOperation opTasks3;
+
+  @Before
+  public void setup() {
+    localDataDir = System.getProperty("test.build.data");
+    customExecutorPath = localDataDir + "/bin/container-executor";
+    emptyConf = new YarnConfiguration();
+    confWithExecutorPath = new YarnConfiguration();
+    confWithExecutorPath.set(YarnConfiguration
+        .NM_LINUX_CONTAINER_EXECUTOR_PATH, customExecutorPath);
+
+    cGroupTasksNone = "none";
+    cGroupTasksInvalid = "invalid_string";
+    cGroupTasks1 = "cpu/hadoop_yarn/container_01/tasks";
+    cGroupTasks2 = "net_cls/hadoop_yarn/container_01/tasks";
+    cGroupTasks3 = "blkio/hadoop_yarn/container_01/tasks";
+    opDisallowed = new PrivilegedOperation
+        (PrivilegedOperation.OperationType.DELETE_AS_USER, (String) null);
+    opTasksNone = new PrivilegedOperation
+        (PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+            PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupTasksNone);
+    opTasksInvalid = new PrivilegedOperation
+        (PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+            cGroupTasksInvalid);
+    opTasks1 = new PrivilegedOperation
+        (PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+            PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupTasks1);
+    opTasks2 = new PrivilegedOperation
+        (PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+            PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupTasks2);
+    opTasks3 = new PrivilegedOperation
+        (PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+            PrivilegedOperation.CGROUP_ARG_PREFIX + cGroupTasks3);
+  }
+
+  @Test
+  public void testExecutorPath() {
+    String containerExePath = PrivilegedOperationExecutor
+        .getContainerExecutorExecutablePath(nullConf);
+
+    //In case HADOOP_YARN_HOME isn't set, CWD is used. If conf is null or
+    //NM_LINUX_CONTAINER_EXECUTOR_PATH is not set, then a defaultPath is
+    //constructed.
+    String yarnHomeEnvVar = System.getenv("HADOOP_YARN_HOME");
+    String yarnHome = yarnHomeEnvVar != null ? yarnHomeEnvVar
+        : new File("").getAbsolutePath();
+    String expectedPath = yarnHome + "/bin/container-executor";
+
+    Assert.assertEquals(expectedPath, containerExePath);
+
+    containerExePath = PrivilegedOperationExecutor
+        .getContainerExecutorExecutablePath(emptyConf);
+    Assert.assertEquals(expectedPath, containerExePath);
+
+    //if NM_LINUX_CONTAINER_EXECUTOR_PATH is set, this must be returned
+    expectedPath = customExecutorPath;
+    containerExePath = PrivilegedOperationExecutor
+        .getContainerExecutorExecutablePath(confWithExecutorPath);
+    Assert.assertEquals(expectedPath, containerExePath);
+  }
+
+  @Test
+  public void testExecutionCommand() {
+    PrivilegedOperationExecutor exec = PrivilegedOperationExecutor
+        .getInstance(confWithExecutorPath);
+    PrivilegedOperation op = new PrivilegedOperation(PrivilegedOperation
+        .OperationType.LAUNCH_CONTAINER, (String) null);
+    String[] cmdArray = exec.getPrivilegedOperationExecutionCommand(null, op);
+
+    //No arguments added - so the resulting array should consist of
+    //1)full path to executor 2) cli switch
+    Assert.assertEquals(2, cmdArray.length);
+    Assert.assertEquals(customExecutorPath, cmdArray[0]);
+    Assert.assertEquals(op.getOperationType().getOption(), cmdArray[1]);
+
+    //other (dummy) arguments to launch container
+    String[] additionalArgs = { "test_user", "yarn", "1", "app_01",
+        "container_01", "workdir", "launch_script.sh", "tokens", "pidfile",
+        "nm-local-dirs", "nm-log-dirs", "resource-spec" };
+
+    op.appendArgs(additionalArgs);
+    cmdArray = exec.getPrivilegedOperationExecutionCommand(null, op);
+
+    //Resulting array should be of length 2 greater than the number of
+    //additional arguments added.
+
+    Assert.assertEquals(2 + additionalArgs.length, cmdArray.length);
+    Assert.assertEquals(customExecutorPath, cmdArray[0]);
+    Assert.assertEquals(op.getOperationType().getOption(), cmdArray[1]);
+
+    //Rest of args should be same as additional args.
+    for (int i = 0; i < additionalArgs.length; ++i) {
+      Assert.assertEquals(additionalArgs[i], cmdArray[2 + i]);
+    }
+
+    //Now test prefix commands
+    List<String> prefixCommands = Arrays.asList("nice", "-10");
+    cmdArray = exec.getPrivilegedOperationExecutionCommand(prefixCommands, op);
+    int prefixLength = prefixCommands.size();
+    //Resulting array should be of length of prefix command args + 2 (exec
+    // path + switch) + length of additional args.
+    Assert.assertEquals(prefixLength + 2 + additionalArgs.length,
+        cmdArray.length);
+
+    //Prefix command array comes first
+    for (int i = 0; i < prefixLength; ++i) {
+      Assert.assertEquals(prefixCommands.get(i), cmdArray[i]);
+    }
+
+    //Followed by the container executor path and the cli switch
+    Assert.assertEquals(customExecutorPath, cmdArray[prefixLength]);
+    Assert.assertEquals(op.getOperationType().getOption(),
+        cmdArray[prefixLength + 1]);
+
+    //Followed by the rest of the args
+    //Rest of args should be same as additional args.
+    for (int i = 0; i < additionalArgs.length; ++i) {
+      Assert.assertEquals(additionalArgs[i], cmdArray[prefixLength + 2 + i]);
+    }
+  }
+
+  @Test
+  public void testSquashCGroupOperationsWithInvalidOperations() {
+    List<PrivilegedOperation> ops = new ArrayList<>();
+
+    //Ensure that disallowed ops are rejected
+    ops.add(opTasksNone);
+    ops.add(opDisallowed);
+
+    try {
+      PrivilegedOperationExecutor.squashCGroupOperations(ops);
+      Assert.fail("Expected squash operation to fail with an exception!");
+    } catch (PrivilegedOperationException e) {
+      LOG.info("Caught expected exception : " + e);
+    }
+
+    //Ensure that invalid strings are rejected
+    ops.clear();
+    ops.add(opTasksNone);
+    ops.add(opTasksInvalid);
+
+    try {
+      PrivilegedOperationExecutor.squashCGroupOperations(ops);
+      Assert.fail("Expected squash operation to fail with an exception!");
+    } catch (PrivilegedOperationException e) {
+      LOG.info("Caught expected exception : " + e);
+    }
+  }
+
+  @Test
+  public void testSquashCGroupOperationsWithValidOperations() {
+    List<PrivilegedOperation> ops = new ArrayList<>();
+    //Test squashing, including 'none'
+    ops.clear();
+    ops.add(opTasks1);
+    //this is expected to be ignored
+    ops.add(opTasksNone);
+    ops.add(opTasks2);
+    ops.add(opTasks3);
+
+    try {
+      PrivilegedOperation op = PrivilegedOperationExecutor
+          .squashCGroupOperations(ops);
+      String expected = new StringBuffer
+          (PrivilegedOperation.CGROUP_ARG_PREFIX)
+          .append(cGroupTasks1).append(',')
+          .append(cGroupTasks2).append(',')
+          .append(cGroupTasks3).toString();
+
+      //We expect axactly one argument
+      Assert.assertEquals(1, op.getArguments().size());
+      //Squashed list of tasks files
+      Assert.assertEquals(expected, op.getArguments().get(0));
+    } catch (PrivilegedOperationException e) {
+      LOG.info("Caught unexpected exception : " + e);
+      Assert.fail("Caught unexpected exception: " + e);
+    }
+  }
+}

+ 235 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsHandlerImpl.java

@@ -0,0 +1,235 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+public class TestCGroupsHandlerImpl {
+  private static final Log LOG =
+      LogFactory.getLog(TestCGroupsHandlerImpl.class);
+
+  private PrivilegedOperationExecutor privilegedOperationExecutorMock;
+  private Configuration conf;
+  private String tmpPath;
+  private String hierarchy;
+  private CGroupsHandler.CGroupController controller;
+  private String controllerPath;
+
+  @Before
+  public void setup() {
+    privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class);
+    conf = new YarnConfiguration();
+    tmpPath = System.getProperty("test.build.data") + "/cgroups";
+    //no leading or trailing slashes here
+    hierarchy = "test-hadoop-yarn";
+
+    conf.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, hierarchy);
+    conf.setBoolean(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT, true);
+    conf.set(YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH, tmpPath);
+    controller = CGroupsHandler.CGroupController.NET_CLS;
+    controllerPath = new StringBuffer(tmpPath).append('/')
+        .append(controller.getName()).append('/').append(hierarchy).toString();
+  }
+
+  @Test
+  public void testMountController() {
+    CGroupsHandler cGroupsHandler = null;
+    //Since we enabled (deferred) cgroup controller mounting, no interactions
+    //should have occurred, with this mock
+    verifyZeroInteractions(privilegedOperationExecutorMock);
+
+    try {
+      cGroupsHandler = new CGroupsHandlerImpl(conf,
+          privilegedOperationExecutorMock);
+      PrivilegedOperation expectedOp = new PrivilegedOperation
+          (PrivilegedOperation.OperationType.MOUNT_CGROUPS, (String) null);
+      //This is expected to be of the form :
+      //net_cls=<mount_path>/net_cls
+      StringBuffer controllerKV = new StringBuffer(controller.getName())
+          .append('=').append(tmpPath).append('/').append(controller.getName());
+      expectedOp.appendArgs(hierarchy, controllerKV.toString());
+
+      cGroupsHandler.mountCGroupController(controller);
+      try {
+        ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass
+            (PrivilegedOperation.class);
+        verify(privilegedOperationExecutorMock)
+            .executePrivilegedOperation(opCaptor.capture(), eq(false));
+
+        //we'll explicitly capture and assert that the
+        //captured op and the expected op are identical.
+        Assert.assertEquals(expectedOp, opCaptor.getValue());
+        verifyNoMoreInteractions(privilegedOperationExecutorMock);
+
+        //Try mounting the same controller again - this should be a no-op
+        cGroupsHandler.mountCGroupController(controller);
+        verifyNoMoreInteractions(privilegedOperationExecutorMock);
+      } catch (PrivilegedOperationException e) {
+        LOG.error("Caught exception: " + e);
+        Assert.assertTrue("Unexpected PrivilegedOperationException from mock!",
+            false);
+      }
+    } catch (ResourceHandlerException e) {
+      LOG.error("Caught exception: " + e);
+      Assert.assertTrue("Unexpected ResourceHandler Exception!", false);
+    }
+  }
+
+  @Test
+  public void testCGroupPaths() {
+    //As per junit behavior, we expect a new mock object to be available
+    //in this test.
+    verifyZeroInteractions(privilegedOperationExecutorMock);
+    CGroupsHandler cGroupsHandler = null;
+    try {
+      cGroupsHandler = new CGroupsHandlerImpl(conf,
+          privilegedOperationExecutorMock);
+      cGroupsHandler.mountCGroupController(controller);
+    } catch (ResourceHandlerException e) {
+      LOG.error("Caught exception: " + e);
+      Assert.assertTrue(
+          "Unexpected ResourceHandlerException when mounting controller!",
+          false);
+    }
+
+    String testCGroup = "container_01";
+    String expectedPath = new StringBuffer(controllerPath).append('/')
+        .append(testCGroup).toString();
+    String path = cGroupsHandler.getPathForCGroup(controller, testCGroup);
+    Assert.assertEquals(expectedPath, path);
+
+    String expectedPathTasks = new StringBuffer(expectedPath).append('/')
+        .append(CGroupsHandler.CGROUP_FILE_TASKS).toString();
+    path = cGroupsHandler.getPathForCGroupTasks(controller, testCGroup);
+    Assert.assertEquals(expectedPathTasks, path);
+
+    String param = CGroupsHandler.CGROUP_PARAM_CLASSID;
+    String expectedPathParam = new StringBuffer(expectedPath).append('/')
+        .append(controller.getName()).append('.').append(param).toString();
+    path = cGroupsHandler.getPathForCGroupParam(controller, testCGroup, param);
+    Assert.assertEquals(expectedPathParam, path);
+  }
+
+  @Test
+  public void testCGroupOperations() {
+    //As per junit behavior, we expect a new mock object to be available
+    //in this test.
+    verifyZeroInteractions(privilegedOperationExecutorMock);
+    CGroupsHandler cGroupsHandler = null;
+
+    try {
+      cGroupsHandler = new CGroupsHandlerImpl(conf,
+          privilegedOperationExecutorMock);
+      cGroupsHandler.mountCGroupController(controller);
+    } catch (ResourceHandlerException e) {
+      LOG.error("Caught exception: " + e);
+      Assert.assertTrue(
+          "Unexpected ResourceHandlerException when mounting controller!",
+          false);
+    }
+    //Lets manually create a path to (partially) simulate a mounted controller
+    //this is required because the handler uses a mocked privileged operation
+    //executor
+    new File(controllerPath).mkdirs();
+
+    String testCGroup = "container_01";
+    String expectedPath = new StringBuffer(controllerPath).append('/')
+        .append(testCGroup).toString();
+    try {
+      String path = cGroupsHandler.createCGroup(controller, testCGroup);
+
+      Assert.assertTrue(new File(expectedPath).exists());
+      Assert.assertEquals(expectedPath, path);
+
+      //update param and read param tests.
+      //We don't use net_cls.classid because as a test param here because
+      //cgroups provides very specific read/write semantics for classid (only
+      //numbers can be written - potentially as hex but can be read out only
+      //as decimal)
+      String param = "test_param";
+      String paramValue = "test_param_value";
+
+      cGroupsHandler
+          .updateCGroupParam(controller, testCGroup, param, paramValue);
+      String paramPath = new StringBuffer(expectedPath).append('/')
+          .append(controller.getName()).append('.').append(param).toString();
+      File paramFile = new File(paramPath);
+
+      Assert.assertTrue(paramFile.exists());
+      try {
+        Assert.assertEquals(paramValue, new String(Files.readAllBytes
+            (paramFile
+                .toPath())));
+      } catch (IOException e) {
+        LOG.error("Caught exception: " + e);
+        Assert.assertTrue("Unexpected IOException trying to read cgroup param!",
+            false);
+      }
+
+      Assert.assertEquals(paramValue, cGroupsHandler.getCGroupParam
+          (controller, testCGroup, param));
+
+      //We can't really do a delete test here. Linux cgroups
+      //implementation provides additional semantics - the cgroup cannot be
+      //deleted if there are any tasks still running in the cgroup even if
+      //the user attempting the delete has the file permissions to do so - we
+      //cannot simulate that here. Even if we create a dummy 'tasks' file, we
+      //wouldn't be able to simulate the delete behavior we need, since a cgroup
+      //can be deleted using using 'rmdir' if the tasks file is empty. Such a
+      //delete is not possible with a regular non-empty directory.
+    } catch (ResourceHandlerException e) {
+      LOG.error("Caught exception: " + e);
+      Assert.assertTrue(
+          "Unexpected ResourceHandlerException during cgroup operations!",
+          false);
+    }
+  }
+
+  @After
+  public void teardown() {
+    FileUtil.fullyDelete(new File(tmpPath));
+  }
+}