浏览代码

YARN-3366. Enhanced NodeManager to support classifying/shaping outgoing network bandwidth traffic originating from YARN containers Contributed by Sidharta Seethana.

(cherry picked from commit a100be685cc4521e9949589948219231aa5d2733)
Vinod Kumar Vavilapalli 10 年之前
父节点
当前提交
04783b0402
共有 10 个文件被更改,包括 1864 次插入2 次删除
  1. 4 0
      hadoop-yarn-project/CHANGES.txt
  2. 37 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 99 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
  4. 29 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/OutboundBandwidthResourceHandler.java
  5. 128 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java
  6. 281 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java
  7. 650 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java
  8. 78 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java
  9. 231 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficControlBandwidthHandlerImpl.java
  10. 327 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -48,6 +48,10 @@ Release 2.8.0 - UNRELEASED
     YARN-3225. New parameter of CLI for decommissioning node gracefully in 
     RMAdmin CLI. (Devaraj K via junping_du)
 
+    YARN-3366. Enhanced NodeManager to support classifying/shaping outgoing
+    network bandwidth traffic originating from YARN containers (Sidharta Seethana
+    via vinodkv)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

+ 37 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -822,7 +822,43 @@ public class YarnConfiguration extends Configuration {
       NM_PREFIX + "resource.percentage-physical-cpu-limit";
   public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
       100;
-  
+
+
+  public static final String NM_NETWORK_RESOURCE_PREFIX = NM_PREFIX + "resource.network.";
+
+  /** This setting controls if resource handling for network bandwidth is enabled **/
+  /* Work in progress: This configuration parameter may be changed/removed in the future */
+  @Private
+  public static final String NM_NETWORK_RESOURCE_ENABLED =
+      NM_NETWORK_RESOURCE_PREFIX + "enabled";
+  /** Network as a resource is disabled by default **/
+  @Private
+  public static final boolean DEFAULT_NM_NETWORK_RESOURCE_ENABLED = false;
+
+  /** Specifies the interface to be used for applying network throttling rules **/
+  /* Work in progress: This configuration parameter may be changed/removed in the future */
+  @Private
+  public static final String NM_NETWORK_RESOURCE_INTERFACE =
+      NM_NETWORK_RESOURCE_PREFIX + "interface";
+  @Private
+  public static final String DEFAULT_NM_NETWORK_RESOURCE_INTERFACE = "eth0";
+
+  /** Specifies the total available outbound bandwidth on the node **/
+  /* Work in progress: This configuration parameter may be changed/removed in the future */
+  @Private
+  public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT =
+      NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-mbit";
+  @Private
+  public static final int DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT = 1000;
+
+  /** Specifies the total outbound bandwidth available to YARN containers. defaults to
+   * NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT if not specified.
+   */
+  /* Work in progress: This configuration parameter may be changed/removed in the future */
+  @Private
+  public static final String NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT =
+      NM_NETWORK_RESOURCE_PREFIX + "outbound-bandwidth-yarn-mbit";
+
   /** NM Webapp address.**/
   public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address";
   public static final int DEFAULT_NM_WEBAPP_PORT = 8042;

+ 99 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java

@@ -43,7 +43,13 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule;
 import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
 import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -60,6 +66,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
   private boolean containerSchedPriorityIsSet = false;
   private int containerSchedPriorityAdjustment = 0;
   private boolean containerLimitUsers;
+  private ResourceHandler resourceHandlerChain;
 
   @Override
   public void setConf(Configuration conf) {
@@ -191,7 +198,20 @@ public class LinuxContainerExecutor extends ContainerExecutor {
       throw new IOException("Linux container executor not configured properly"
           + " (error=" + exitCode + ")", e);
     }
-   
+
+    try {
+      Configuration conf = super.getConf();
+
+      resourceHandlerChain = ResourceHandlerModule
+          .getConfiguredResourceHandlerChain(conf);
+      if (resourceHandlerChain != null) {
+        resourceHandlerChain.bootstrap(conf);
+      }
+    } catch (ResourceHandlerException e) {
+      LOG.error("Failed to bootstrap configured resource subsystems! ", e);
+      throw new IOException("Failed to bootstrap configured resource subsystems!");
+    }
+
     resourcesHandler.init(this);
   }
   
@@ -270,6 +290,51 @@ public class LinuxContainerExecutor extends ContainerExecutor {
             container.getResource());
     String resourcesOptions = resourcesHandler.getResourcesOption(
             containerId);
+    String tcCommandFile = null;
+
+    try {
+      if (resourceHandlerChain != null) {
+        List<PrivilegedOperation> ops = resourceHandlerChain
+            .preStart(container);
+
+        if (ops != null) {
+          List<PrivilegedOperation> resourceOps = new ArrayList<>();
+
+          resourceOps.add(new PrivilegedOperation
+              (PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+                  resourcesOptions));
+
+          for (PrivilegedOperation op : ops) {
+            switch (op.getOperationType()) {
+              case ADD_PID_TO_CGROUP:
+                resourceOps.add(op);
+                break;
+              case TC_MODIFY_STATE:
+                tcCommandFile = op.getArguments().get(0);
+                break;
+              default:
+                LOG.warn("PrivilegedOperation type unsupported in launch: "
+                    + op.getOperationType());
+            }
+          }
+
+          if (resourceOps.size() > 1) {
+            //squash resource operations
+            try {
+              PrivilegedOperation operation = PrivilegedOperationExecutor
+                  .squashCGroupOperations(resourceOps);
+              resourcesOptions = operation.getArguments().get(0);
+            } catch (PrivilegedOperationException e) {
+              LOG.error("Failed to squash cgroup operations!", e);
+              throw new ResourceHandlerException("Failed to squash cgroup operations!");
+            }
+          }
+        }
+      }
+    } catch (ResourceHandlerException e) {
+      LOG.error("ResourceHandlerChain.preStart() failed!", e);
+      throw new IOException("ResourceHandlerChain.preStart() failed!");
+    }
 
     ShellCommandExecutor shExec = null;
 
@@ -288,6 +353,11 @@ public class LinuxContainerExecutor extends ContainerExecutor {
             StringUtils.join(",", localDirs),
             StringUtils.join(",", logDirs),
             resourcesOptions));
+
+        if (tcCommandFile != null) {
+            command.add(tcCommandFile);
+        }
+
         String[] commandArray = command.toArray(new String[command.size()]);
         shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
             container.getLaunchContext().getEnvironment()); // sanitized env
@@ -336,6 +406,15 @@ public class LinuxContainerExecutor extends ContainerExecutor {
       return exitCode;
     } finally {
       resourcesHandler.postExecute(containerId);
+
+      try {
+        if (resourceHandlerChain != null) {
+          resourceHandlerChain.postComplete(containerId);
+        }
+      } catch (ResourceHandlerException e) {
+        LOG.warn("ResourceHandlerChain.postComplete failed for " +
+            "containerId: " + containerId + ". Exception: " + e);
+      }
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Output from LinuxContainerExecutor's launchContainer follows:");
@@ -348,9 +427,28 @@ public class LinuxContainerExecutor extends ContainerExecutor {
   public int reacquireContainer(String user, ContainerId containerId)
       throws IOException, InterruptedException {
     try {
+      //Resource handler chain needs to reacquire container state
+      //as well
+      if (resourceHandlerChain != null) {
+        try {
+          resourceHandlerChain.reacquireContainer(containerId);
+        } catch (ResourceHandlerException e) {
+          LOG.warn("ResourceHandlerChain.reacquireContainer failed for " +
+              "containerId: " + containerId + " Exception: " + e);
+        }
+      }
+
       return super.reacquireContainer(user, containerId);
     } finally {
       resourcesHandler.postExecute(containerId);
+      if (resourceHandlerChain != null) {
+        try {
+          resourceHandlerChain.postComplete(containerId);
+        } catch (ResourceHandlerException e) {
+          LOG.warn("ResourceHandlerChain.postComplete failed for " +
+              "containerId: " + containerId + " Exception: " + e);
+        }
+      }
     }
   }
 

+ 29 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/OutboundBandwidthResourceHandler.java

@@ -0,0 +1,29 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface OutboundBandwidthResourceHandler extends ResourceHandler {
+}

+ 128 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java

@@ -0,0 +1,128 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides mechanisms to get various resource handlers - cpu, memory, network,
+ * disk etc., - based on configuration
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ResourceHandlerModule {
+  private volatile static ResourceHandlerChain resourceHandlerChain;
+
+  /**
+   * This specific implementation might provide resource management as well
+   * as resource metrics functionality. We need to ensure that the same
+   * instance is used for both.
+   */
+  private volatile static TrafficControlBandwidthHandlerImpl
+      trafficControlBandwidthHandler;
+  private volatile static CGroupsHandler cGroupsHandler;
+
+  /**
+   * Returns an initialized, thread-safe CGroupsHandler instance
+   */
+  public static CGroupsHandler getCGroupsHandler(Configuration conf)
+      throws ResourceHandlerException {
+    if (cGroupsHandler == null) {
+      synchronized (CGroupsHandler.class) {
+        if (cGroupsHandler == null) {
+          cGroupsHandler = new CGroupsHandlerImpl(conf,
+              PrivilegedOperationExecutor.getInstance(conf));
+        }
+      }
+    }
+
+    return cGroupsHandler;
+  }
+
+  private static TrafficControlBandwidthHandlerImpl
+  getTrafficControlBandwidthHandler(Configuration conf)
+      throws ResourceHandlerException {
+    if (conf.getBoolean(YarnConfiguration.NM_NETWORK_RESOURCE_ENABLED,
+        YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_ENABLED)) {
+      if (trafficControlBandwidthHandler == null) {
+        synchronized (OutboundBandwidthResourceHandler.class) {
+          if (trafficControlBandwidthHandler == null) {
+            trafficControlBandwidthHandler = new
+                TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor
+                .getInstance(conf), getCGroupsHandler(conf),
+                new TrafficController(conf, PrivilegedOperationExecutor
+                    .getInstance(conf)));
+          }
+        }
+      }
+
+      return trafficControlBandwidthHandler;
+    } else {
+      return null;
+    }
+  }
+
+  public static OutboundBandwidthResourceHandler
+  getOutboundBandwidthResourceHandler(Configuration conf)
+      throws ResourceHandlerException {
+    return getTrafficControlBandwidthHandler(conf);
+  }
+
+  private static void addHandlerIfNotNull(List<ResourceHandler> handlerList,
+      ResourceHandler handler) {
+    if (handler != null) {
+      handlerList.add(handler);
+    }
+  }
+
+  private static void initializeConfiguredResourceHandlerChain(
+      Configuration conf) throws ResourceHandlerException {
+    ArrayList<ResourceHandler> handlerList = new ArrayList<>();
+
+    addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf));
+    resourceHandlerChain = new ResourceHandlerChain(handlerList);
+  }
+
+  public static ResourceHandlerChain getConfiguredResourceHandlerChain
+      (Configuration conf) throws ResourceHandlerException {
+    if (resourceHandlerChain == null) {
+      synchronized (ResourceHandlerModule.class) {
+        if (resourceHandlerChain == null) {
+          initializeConfiguredResourceHandlerChain(conf);
+        }
+      }
+    }
+
+    if (resourceHandlerChain.getResourceHandlerList().size() != 0) {
+      return resourceHandlerChain;
+    } else {
+      return null;
+    }
+  }
+}

+ 281 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java

@@ -0,0 +1,281 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TrafficControlBandwidthHandlerImpl
+    implements OutboundBandwidthResourceHandler {
+
+  private static final Log LOG = LogFactory
+      .getLog(TrafficControlBandwidthHandlerImpl.class);
+  //In the absence of 'scheduling' support, we'll 'infer' the guaranteed
+  //outbound bandwidth for each container based on this number. This will
+  //likely go away once we add support on the RM for this resource type.
+  private static final int MAX_CONTAINER_COUNT = 50;
+
+  private final PrivilegedOperationExecutor privilegedOperationExecutor;
+  private final CGroupsHandler cGroupsHandler;
+  private final TrafficController trafficController;
+  private final ConcurrentHashMap<ContainerId, Integer> containerIdClassIdMap;
+
+  private Configuration conf;
+  private String device;
+  private boolean strictMode;
+  private int containerBandwidthMbit;
+  private int rootBandwidthMbit;
+  private int yarnBandwidthMbit;
+
+  public TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor
+      privilegedOperationExecutor, CGroupsHandler cGroupsHandler,
+      TrafficController trafficController) {
+    this.privilegedOperationExecutor = privilegedOperationExecutor;
+    this.cGroupsHandler = cGroupsHandler;
+    this.trafficController = trafficController;
+    this.containerIdClassIdMap = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Bootstrapping 'outbound-bandwidth' resource handler - mounts net_cls
+   * controller and bootstraps a traffic control bandwidth shaping hierarchy
+   * @param configuration yarn configuration in use
+   * @return (potentially empty) list of privileged operations to execute.
+   * @throws ResourceHandlerException
+   */
+
+  @Override
+  public List<PrivilegedOperation> bootstrap(Configuration configuration)
+      throws ResourceHandlerException {
+    conf = configuration;
+    //We'll do this inline for the time being - since this is a one time
+    //operation. At some point, LCE code can be refactored to batch mount
+    //operations across multiple controllers - cpu, net_cls, blkio etc
+    cGroupsHandler
+        .mountCGroupController(CGroupsHandler.CGroupController.NET_CLS);
+    device = conf.get(YarnConfiguration.NM_NETWORK_RESOURCE_INTERFACE,
+        YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_INTERFACE);
+    strictMode = configuration.getBoolean(YarnConfiguration
+        .NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, YarnConfiguration
+        .DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE);
+    rootBandwidthMbit = conf.getInt(YarnConfiguration
+        .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT, YarnConfiguration
+        .DEFAULT_NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT);
+    yarnBandwidthMbit = conf.getInt(YarnConfiguration
+        .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT, rootBandwidthMbit);
+    containerBandwidthMbit = (int) Math.ceil((double) yarnBandwidthMbit /
+        MAX_CONTAINER_COUNT);
+
+    StringBuffer logLine = new StringBuffer("strict mode is set to :")
+        .append(strictMode).append(System.lineSeparator());
+
+    if (strictMode) {
+      logLine.append("container bandwidth will be capped to soft limit.")
+          .append(System.lineSeparator());
+    } else {
+      logLine.append(
+          "containers will be allowed to use spare YARN bandwidth.")
+          .append(System.lineSeparator());
+    }
+
+    logLine
+        .append("containerBandwidthMbit soft limit (in mbit/sec) is set to : ")
+        .append(containerBandwidthMbit);
+
+    LOG.info(logLine);
+    trafficController.bootstrap(device, rootBandwidthMbit, yarnBandwidthMbit);
+
+    return null;
+  }
+
+  /**
+   * Pre-start hook for 'outbound-bandwidth' resource. A cgroup is created
+   * and a net_cls classid is generated and written to a cgroup file. A
+   * traffic control shaping rule is created in order to limit outbound
+   * bandwidth utilization.
+   * @param container Container being launched
+   * @return privileged operations for some cgroups/tc operations.
+   * @throws ResourceHandlerException
+   */
+  @Override
+  public List<PrivilegedOperation> preStart(Container container)
+      throws ResourceHandlerException {
+    String containerIdStr = container.getContainerId().toString();
+    int classId = trafficController.getNextClassId();
+    String classIdStr = trafficController.getStringForNetClsClassId(classId);
+
+    cGroupsHandler.createCGroup(CGroupsHandler.CGroupController
+            .NET_CLS,
+        containerIdStr);
+    cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.NET_CLS,
+        containerIdStr, CGroupsHandler.CGROUP_PARAM_CLASSID, classIdStr);
+    containerIdClassIdMap.put(container.getContainerId(), classId);
+
+    //Now create a privileged operation in order to update the tasks file with
+    //the pid of the running container process (root of process tree). This can
+    //only be done at the time of launching the container, in a privileged
+    //executable.
+    String tasksFile = cGroupsHandler.getPathForCGroupTasks(
+        CGroupsHandler.CGroupController.NET_CLS, containerIdStr);
+    String opArg = new StringBuffer(PrivilegedOperation.CGROUP_ARG_PREFIX)
+        .append(tasksFile).toString();
+    List<PrivilegedOperation> ops = new ArrayList<>();
+
+    ops.add(new PrivilegedOperation(
+        PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, opArg));
+
+    //Create a privileged operation to create a tc rule for this container
+    //We'll return this to the calling (Linux) Container Executor
+    //implementation for batching optimizations so that we don't fork/exec
+    //additional times during container launch.
+    TrafficController.BatchBuilder builder = trafficController.new
+        BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE);
+
+    builder.addContainerClass(classId, containerBandwidthMbit, strictMode);
+    ops.add(builder.commitBatchToTempFile());
+
+    return ops;
+  }
+
+  /**
+   * Reacquires state for a container - reads the classid from the cgroup
+   * being used for the container being reacquired
+   * @param containerId if of the container being reacquired.
+   * @return (potentially empty) list of privileged operations
+   * @throws ResourceHandlerException
+   */
+
+  @Override
+  public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
+      throws ResourceHandlerException {
+    String containerIdStr = containerId.toString();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Attempting to reacquire classId for container: " +
+          containerIdStr);
+    }
+
+    String classIdStrFromFile = cGroupsHandler.getCGroupParam(
+        CGroupsHandler.CGroupController.NET_CLS, containerIdStr,
+        CGroupsHandler.CGROUP_PARAM_CLASSID);
+    int classId = trafficController
+        .getClassIdFromFileContents(classIdStrFromFile);
+
+    LOG.info("Reacquired containerId -> classId mapping: " + containerIdStr
+        + " -> " + classId);
+    containerIdClassIdMap.put(containerId, classId);
+
+    return null;
+  }
+
+  /**
+   * Returns total bytes sent per container to be used for metrics tracking
+   * purposes.
+   * @return a map of containerId to bytes sent
+   * @throws ResourceHandlerException
+   */
+  public Map<ContainerId, Integer> getBytesSentPerContainer()
+      throws ResourceHandlerException {
+    Map<Integer, Integer> classIdStats = trafficController.readStats();
+    Map<ContainerId, Integer> containerIdStats = new HashMap<>();
+
+    for (Map.Entry<ContainerId, Integer> entry : containerIdClassIdMap
+        .entrySet()) {
+      ContainerId containerId = entry.getKey();
+      Integer classId = entry.getValue();
+      Integer bytesSent = classIdStats.get(classId);
+
+      if (bytesSent == null) {
+        LOG.warn("No bytes sent metric found for container: " + containerId +
+            " with classId: " + classId);
+        continue;
+      }
+      containerIdStats.put(containerId, bytesSent);
+    }
+
+    return containerIdStats;
+  }
+
+  /**
+   * Cleanup operations once container is completed - deletes cgroup and
+   * removes traffic shaping rule(s).
+   * @param containerId of the container that was completed.
+   * @return
+   * @throws ResourceHandlerException
+   */
+  @Override
+  public List<PrivilegedOperation> postComplete(ContainerId containerId)
+      throws ResourceHandlerException {
+    LOG.info("postComplete for container: " + containerId.toString());
+    cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.NET_CLS,
+        containerId.toString());
+
+    Integer classId = containerIdClassIdMap.get(containerId);
+
+    if (classId != null) {
+      PrivilegedOperation op = trafficController.new
+          BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
+          .deleteContainerClass(classId).commitBatchToTempFile();
+
+      try {
+        privilegedOperationExecutor.executePrivilegedOperation(op, false);
+        trafficController.releaseClassId(classId);
+      } catch (PrivilegedOperationException e) {
+        LOG.warn("Failed to delete tc rule for classId: " + classId);
+        throw new ResourceHandlerException(
+            "Failed to delete tc rule for classId:" + classId);
+      }
+    } else {
+      LOG.warn("Not cleaning up tc rules. classId unknown for container: " +
+          containerId.toString());
+    }
+
+    return null;
+  }
+
+  @Override
+  public List<PrivilegedOperation> teardown()
+      throws ResourceHandlerException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("teardown(): Nothing to do");
+    }
+
+    return null;
+  }
+}

+ 650 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficController.java

@@ -0,0 +1,650 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Wrapper around the 'tc' tool. Provides access to a very specific subset of
+ * the functionality provided by the tc tool.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable class TrafficController {
+  private static final Log LOG = LogFactory.getLog(TrafficController.class);
+  private static final int ROOT_QDISC_HANDLE = 42;
+  private static final int ZERO_CLASS_ID = 0;
+  private static final int ROOT_CLASS_ID = 1;
+  /** Traffic shaping class used for all unclassified traffic */
+  private static final int DEFAULT_CLASS_ID = 2;
+  /** Traffic shaping class used for all YARN traffic */
+  private static final int YARN_ROOT_CLASS_ID = 3;
+  /** Classes 0-3 are used already. We need to ensure that container classes
+   * do not collide with these classids.
+   */
+  private static final int MIN_CONTAINER_CLASS_ID = 4;
+  /** This is the number of distinct (container) traffic shaping classes
+   * that are supported */
+  private static final int MAX_CONTAINER_CLASSES = 1024;
+
+  private static final String MBIT_SUFFIX = "mbit";
+  private static final String TMP_FILE_PREFIX = "tc.";
+  private static final String TMP_FILE_SUFFIX = ".cmds";
+
+  /** Root queuing discipline attached to the root of the interface */
+  private static final String FORMAT_QDISC_ADD_TO_ROOT_WITH_DEFAULT =
+      "qdisc add dev %s root handle %d: htb default %s";
+  /** Specifies a cgroup/classid based filter - based on the classid associated
+   * with the outbound packet, the corresponding traffic shaping rule is used
+   * . Please see tc documentation for additional details.
+   */
+  private static final String FORMAT_FILTER_CGROUP_ADD_TO_PARENT =
+      "filter add dev %s parent %d: protocol ip prio 10 handle 1: cgroup";
+  /** Standard format for adding a traffic shaping class to a parent, with
+   * the specified bandwidth limits
+   */
+  private static final String FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES =
+      "class add dev %s parent %d:%d classid %d:%d htb rate %s ceil %s";
+  /** Standard format to delete a traffic shaping class */
+  private static final String FORMAT_DELETE_CLASS =
+      "class del dev %s classid %d:%d";
+  /** Format of the classid that is to be used with the net_cls cgroup. Needs
+   * to be of the form 0xAAAABBBB */
+  private static final String FORMAT_NET_CLS_CLASS_ID = "0x%04d%04d";
+  /** Commands to read the qdsic(s)/filter(s)/class(es) associated with an
+   * interface
+   */
+  private static final String FORMAT_READ_STATE =
+      "qdisc show dev %1$s%n" +
+          "filter show dev %1$s%n" +
+          "class show dev %1$s";
+  private static final String FORMAT_READ_CLASSES = "class show dev %s";
+  /** Delete a qdisc and all its children - classes/filters etc */
+  private static final String FORMAT_WIPE_STATE =
+      "qdisc del dev %s parent root";
+
+  private final Configuration conf;
+  //Used to store the set of classids in use for container classes
+  private final BitSet classIdSet;
+  private final PrivilegedOperationExecutor privilegedOperationExecutor;
+
+  private String tmpDirPath;
+  private String device;
+  private int rootBandwidthMbit;
+  private int yarnBandwidthMbit;
+  private int defaultClassBandwidthMbit;
+
+  TrafficController(Configuration conf, PrivilegedOperationExecutor exec) {
+    this.conf = conf;
+    this.classIdSet = new BitSet(MAX_CONTAINER_CLASSES);
+    this.privilegedOperationExecutor = exec;
+  }
+
+  /**
+   * Bootstrap tc configuration
+   */
+  public void bootstrap(String device, int rootBandwidthMbit, int
+      yarnBandwidthMbit)
+      throws ResourceHandlerException {
+    if (device == null) {
+      throw new ResourceHandlerException("device cannot be null!");
+    }
+
+    String tmpDirBase = conf.get("hadoop.tmp.dir");
+    if (tmpDirBase == null) {
+      throw new ResourceHandlerException("hadoop.tmp.dir not set!");
+    }
+    tmpDirPath = tmpDirBase + "/nm-tc-rules";
+
+    File tmpDir = new File(tmpDirPath);
+    if (!(tmpDir.exists() || tmpDir.mkdirs())) {
+      LOG.warn("Unable to create directory: " + tmpDirPath);
+      throw new ResourceHandlerException("Unable to create directory: " +
+          tmpDirPath);
+    }
+
+    this.device = device;
+    this.rootBandwidthMbit = rootBandwidthMbit;
+    this.yarnBandwidthMbit = yarnBandwidthMbit;
+    defaultClassBandwidthMbit = (rootBandwidthMbit - yarnBandwidthMbit) <= 0
+        ? rootBandwidthMbit : (rootBandwidthMbit - yarnBandwidthMbit);
+
+    boolean recoveryEnabled = conf.getBoolean(YarnConfiguration
+        .NM_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
+    String state = null;
+
+    if (!recoveryEnabled) {
+      LOG.info("NM recovery is not enabled. We'll wipe tc state before proceeding.");
+    } else {
+      //NM recovery enabled - run a state check
+      state = readState();
+      if (checkIfAlreadyBootstrapped(state)) {
+        LOG.info("TC configuration is already in place. Not wiping state.");
+
+        //We already have the list of existing container classes, if any
+        //that were created after bootstrapping
+        reacquireContainerClasses(state);
+        return;
+      } else {
+        LOG.info("TC configuration is incomplete. Wiping tc state before proceeding");
+      }
+    }
+
+    wipeState(); //start over in case preview bootstrap was incomplete
+    initializeState();
+  }
+
+  private void initializeState() throws ResourceHandlerException {
+    LOG.info("Initializing tc state.");
+
+    BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
+        OperationType.TC_MODIFY_STATE)
+        .addRootQDisc()
+        .addCGroupFilter()
+        .addClassToRootQDisc(rootBandwidthMbit)
+        .addDefaultClass(defaultClassBandwidthMbit, rootBandwidthMbit)
+            //yarn bandwidth is capped with rate = ceil
+        .addYARNRootClass(yarnBandwidthMbit, yarnBandwidthMbit);
+    PrivilegedOperation op = builder.commitBatchToTempFile();
+
+    try {
+      privilegedOperationExecutor.executePrivilegedOperation(op, false);
+    } catch (PrivilegedOperationException e) {
+      LOG.warn("Failed to bootstrap outbound bandwidth configuration");
+
+      throw new ResourceHandlerException(
+          "Failed to bootstrap outbound bandwidth configuration", e);
+    }
+  }
+
+  /**
+   * Function to check if the interface in use has already been fully
+   * bootstrapped with the required tc configuration
+   *
+   * @return boolean indicating the result of the check
+   */
+  private boolean checkIfAlreadyBootstrapped(String state)
+      throws ResourceHandlerException {
+    List<String> regexes = new ArrayList<>();
+
+    //root qdisc
+    regexes.add(String.format("^qdisc htb %d: root(.)*$",
+        ROOT_QDISC_HANDLE));
+    //cgroup filter
+    regexes.add(String.format("^filter parent %d: protocol ip " +
+        "(.)*cgroup(.)*$", ROOT_QDISC_HANDLE));
+    //root, default and yarn classes
+    regexes.add(String.format("^class htb %d:%d root(.)*$",
+        ROOT_QDISC_HANDLE, ROOT_CLASS_ID));
+    regexes.add(String.format("^class htb %d:%d parent %d:%d(.)*$",
+        ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID, ROOT_QDISC_HANDLE, ROOT_CLASS_ID));
+    regexes.add(String.format("^class htb %d:%d parent %d:%d(.)*$",
+        ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID, ROOT_QDISC_HANDLE,
+        ROOT_CLASS_ID));
+
+    for (String regex : regexes) {
+      Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);
+
+      if (pattern.matcher(state).find()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Matched regex: " + regex);
+        }
+      } else {
+        String logLine = new StringBuffer("Failed to match regex: ")
+              .append(regex).append(" Current state: ").append(state).toString();
+        LOG.warn(logLine);
+        return false;
+      }
+    }
+
+    LOG.info("Bootstrap check succeeded");
+
+    return true;
+  }
+
+  private String readState() throws ResourceHandlerException {
+    //Sample state output:
+    //    qdisc htb 42: root refcnt 2 r2q 10 default 2 direct_packets_stat 0
+    //    filter parent 42: protocol ip pref 10 cgroup handle 0x1
+    //
+    //    filter parent 42: protocol ip pref 10 cgroup handle 0x1
+    //
+    //    class htb 42:1 root rate 10000Kbit ceil 10000Kbit burst 1600b cburst 1600b
+    //    class htb 42:2 parent 42:1 prio 0 rate 3000Kbit ceil 10000Kbit burst 1599b cburst 1600b
+    //    class htb 42:3 parent 42:1 prio 0 rate 7000Kbit ceil 7000Kbit burst 1598b cburst 1598b
+
+    BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
+        OperationType.TC_READ_STATE)
+        .readState();
+    PrivilegedOperation op = builder.commitBatchToTempFile();
+
+    try {
+      String output =
+          privilegedOperationExecutor.executePrivilegedOperation(op, true);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("TC state: %n" + output);
+      }
+
+      return output;
+    } catch (PrivilegedOperationException e) {
+      LOG.warn("Failed to bootstrap outbound bandwidth rules");
+      throw new ResourceHandlerException(
+          "Failed to bootstrap outbound bandwidth rules", e);
+    }
+  }
+
+  private void wipeState() throws ResourceHandlerException {
+    BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
+        OperationType.TC_MODIFY_STATE)
+        .wipeState();
+    PrivilegedOperation op = builder.commitBatchToTempFile();
+
+    try {
+      LOG.info("Wiping tc state.");
+      privilegedOperationExecutor.executePrivilegedOperation(op, false);
+    } catch (PrivilegedOperationException e) {
+      LOG.warn("Failed to wipe tc state. This could happen if the interface" +
+          " is already in its default state. Ignoring.");
+      //Ignoring this exception. This could happen if the interface is already
+      //in its default state. For this reason we don't throw a
+      //ResourceHandlerException here.
+    }
+  }
+
+  /**
+   * Parses the current state looks for classids already in use
+   */
+  private void reacquireContainerClasses(String state) {
+    //At this point we already have already successfully passed
+    //checkIfAlreadyBootstrapped() - so we know that at least the
+    //root classes are in place.
+    String tcClassesStr = state.substring(state.indexOf("class"));
+    //one class per line - the results of the split will need to trimmed
+    String[] tcClasses = Pattern.compile("$", Pattern.MULTILINE)
+        .split(tcClassesStr);
+    Pattern tcClassPattern = Pattern.compile(String.format(
+        "class htb %d:(\\d+) .*", ROOT_QDISC_HANDLE));
+
+    synchronized (classIdSet) {
+      for (String tcClassSplit : tcClasses) {
+        String tcClass = tcClassSplit.trim();
+
+        if (!tcClass.isEmpty()) {
+          Matcher classMatcher = tcClassPattern.matcher(tcClass);
+          if (classMatcher.matches()) {
+            int classId = Integer.parseInt(classMatcher.group(1));
+            if (classId >= MIN_CONTAINER_CLASS_ID) {
+              classIdSet.set(classId - MIN_CONTAINER_CLASS_ID);
+              LOG.info("Reacquired container classid: " + classId);
+            }
+          } else {
+            LOG.warn("Unable to match classid in string:" + tcClass);
+          }
+        }
+      }
+    }
+  }
+
+  public Map<Integer, Integer> readStats() throws ResourceHandlerException {
+    BatchBuilder builder = new BatchBuilder(PrivilegedOperation.
+        OperationType.TC_READ_STATS)
+        .readClasses();
+    PrivilegedOperation op = builder.commitBatchToTempFile();
+
+    try {
+      String output =
+          privilegedOperationExecutor.executePrivilegedOperation(op, true);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("TC stats output:" + output);
+      }
+
+      Map<Integer, Integer> classIdBytesStats = parseStatsString(output);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("classId -> bytes sent %n" + classIdBytesStats);
+      }
+
+      return classIdBytesStats;
+    } catch (PrivilegedOperationException e) {
+      LOG.warn("Failed to get tc stats");
+      throw new ResourceHandlerException("Failed to get tc stats", e);
+    }
+  }
+
+  private Map<Integer, Integer> parseStatsString(String stats) {
+    //Example class stats segment (multiple present in tc output)
+    //  class htb 42:4 parent 42:3 prio 0 rate 1000Kbit ceil 7000Kbit burst1600b cburst 1598b
+    //   Sent 77921300 bytes 52617 pkt (dropped 0, overlimits 0 requeues 0)
+    //   rate 6973Kbit 589pps backlog 0b 39p requeues 0
+    //   lended: 3753 borrowed: 22514 giants: 0
+    //   tokens: -122164 ctokens: -52488
+
+    String[] lines = Pattern.compile("$", Pattern.MULTILINE)
+        .split(stats);
+    Pattern tcClassPattern = Pattern.compile(String.format(
+        "class htb %d:(\\d+) .*", ROOT_QDISC_HANDLE));
+    Pattern bytesPattern = Pattern.compile("Sent (\\d+) bytes.*");
+
+    int currentClassId = -1;
+    Map<Integer, Integer> containerClassIdStats = new HashMap<>();
+
+    for (String lineSplit : lines) {
+      String line = lineSplit.trim();
+
+      if (!line.isEmpty()) {
+        //Check if we encountered a stats segment for a container class
+        Matcher classMatcher = tcClassPattern.matcher(line);
+        if (classMatcher.matches()) {
+          int classId = Integer.parseInt(classMatcher.group(1));
+          if (classId >= MIN_CONTAINER_CLASS_ID) {
+            currentClassId = classId;
+            continue;
+          }
+        }
+
+        //Check if we encountered a stats line
+        Matcher bytesMatcher = bytesPattern.matcher(line);
+        if (bytesMatcher.matches()) {
+          //we found at least one class segment
+          if (currentClassId != -1) {
+            int bytes = Integer.parseInt(bytesMatcher.group(1));
+            containerClassIdStats.put(currentClassId, bytes);
+          } else {
+            LOG.warn("Matched a 'bytes sent' line outside of a class stats " +
+                  "segment : " + line);
+          }
+          continue;
+        }
+
+        //skip other kinds of non-empty lines - since we aren't interested in
+        //them.
+      }
+    }
+
+    return containerClassIdStats;
+  }
+
+  /**
+   * Returns a formatted string for attaching a qdisc to the root of the
+   * device/interface. Additional qdisc
+   * parameters can be supplied - for example, the default 'class' to use for
+   * incoming packets
+   */
+  private String getStringForAddRootQDisc() {
+    return String.format(FORMAT_QDISC_ADD_TO_ROOT_WITH_DEFAULT, device,
+        ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID);
+  }
+
+  /**
+   * Returns a formatted string for a filter that matches packets based on the
+   * presence of net_cls classids
+   */
+  private String getStringForaAddCGroupFilter() {
+    return String.format(FORMAT_FILTER_CGROUP_ADD_TO_PARENT, device,
+        ROOT_QDISC_HANDLE);
+  }
+
+  /**
+   * Get the next available classid. This has to be released post container
+   * complete
+   */
+  public int getNextClassId() throws ResourceHandlerException {
+    synchronized (classIdSet) {
+      int index = classIdSet.nextClearBit(0);
+      if (index >= MAX_CONTAINER_CLASSES) {
+        throw new ResourceHandlerException("Reached max container classes: "
+            + MAX_CONTAINER_CLASSES);
+      }
+      classIdSet.set(index);
+      return (index + MIN_CONTAINER_CLASS_ID);
+    }
+  }
+
+  public void releaseClassId(int classId) throws ResourceHandlerException {
+    synchronized (classIdSet) {
+      int index = classId - MIN_CONTAINER_CLASS_ID;
+      if (index < 0 || index >= MAX_CONTAINER_CLASSES) {
+        throw new ResourceHandlerException("Invalid incoming classId: "
+            + classId);
+      }
+      classIdSet.clear(index);
+    }
+  }
+
+  /**
+   * Returns a formatted string representing the given classId including a
+   * handle
+   */
+  public String getStringForNetClsClassId(int classId) {
+    return String.format(FORMAT_NET_CLS_CLASS_ID, ROOT_QDISC_HANDLE, classId);
+  }
+
+  /**
+   * A value read out of net_cls.classid file is in decimal form. We need to
+   * convert to 32-bit/8 digit hex, extract the lower 16-bit/four digits
+   * as an int
+   */
+  public int getClassIdFromFileContents(String input) {
+    //convert from decimal back to fixed size hex form
+    //e.g 4325381 -> 00420005
+    String classIdStr = String.format("%08x", Integer.parseInt(input));
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ClassId hex string : " + classIdStr);
+    }
+
+    //extract and return 4 digits
+    //e.g 00420005 -> 0005
+    return Integer.parseInt(classIdStr.substring(4));
+  }
+
+  /**
+   * Adds a tc class to qdisc at root
+   */
+  private String getStringForAddClassToRootQDisc(int rateMbit) {
+    String rateMbitStr = rateMbit + MBIT_SUFFIX;
+    //example : "class add dev eth0 parent 42:0 classid 42:1 htb rate 1000mbit
+    // ceil 1000mbit"
+    return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
+        ROOT_QDISC_HANDLE, ZERO_CLASS_ID, ROOT_QDISC_HANDLE, ROOT_CLASS_ID,
+        rateMbitStr, rateMbitStr);
+  }
+
+  private String getStringForAddDefaultClass(int rateMbit, int ceilMbit) {
+    String rateMbitStr = rateMbit + MBIT_SUFFIX;
+    String ceilMbitStr = ceilMbit + MBIT_SUFFIX;
+    //example : "class add dev eth0 parent 42:1 classid 42:2 htb rate 300mbit
+    // ceil 1000mbit"
+    return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
+        ROOT_QDISC_HANDLE, ROOT_CLASS_ID, ROOT_QDISC_HANDLE, DEFAULT_CLASS_ID,
+        rateMbitStr, ceilMbitStr);
+  }
+
+  private String getStringForAddYARNRootClass(int rateMbit, int ceilMbit) {
+    String rateMbitStr = rateMbit + MBIT_SUFFIX;
+    String ceilMbitStr = ceilMbit + MBIT_SUFFIX;
+    //example : "class add dev eth0 parent 42:1 classid 42:3 htb rate 700mbit
+    // ceil 1000mbit"
+    return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
+        ROOT_QDISC_HANDLE, ROOT_CLASS_ID, ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID,
+        rateMbitStr, ceilMbitStr);
+  }
+
+  private String getStringForAddContainerClass(int classId, int rateMbit, int
+      ceilMbit) {
+    String rateMbitStr = rateMbit + MBIT_SUFFIX;
+    String ceilMbitStr = ceilMbit + MBIT_SUFFIX;
+    //example : "class add dev eth0 parent 42:99 classid 42:99 htb rate 50mbit
+    // ceil 700mbit"
+    return String.format(FORMAT_CLASS_ADD_TO_PARENT_WITH_RATES, device,
+        ROOT_QDISC_HANDLE, YARN_ROOT_CLASS_ID, ROOT_QDISC_HANDLE, classId,
+        rateMbitStr, ceilMbitStr);
+  }
+
+  private String getStringForDeleteContainerClass(int classId) {
+    //example "class del dev eth0 classid 42:7"
+    return String.format(FORMAT_DELETE_CLASS, device, ROOT_QDISC_HANDLE,
+        classId);
+  }
+
+  private String getStringForReadState() {
+    return String.format(FORMAT_READ_STATE, device);
+  }
+
+  private String getStringForReadClasses() {
+    return String.format(FORMAT_READ_CLASSES, device);
+  }
+
+  private String getStringForWipeState() {
+    return String.format(FORMAT_WIPE_STATE, device);
+  }
+
+  public class BatchBuilder {
+    final PrivilegedOperation operation;
+    final List<String> commands;
+
+    public BatchBuilder(PrivilegedOperation.OperationType opType)
+        throws ResourceHandlerException {
+      switch (opType) {
+      case TC_MODIFY_STATE:
+      case TC_READ_STATE:
+      case TC_READ_STATS:
+        operation = new PrivilegedOperation(opType, (String) null);
+        commands = new ArrayList<>();
+        break;
+      default:
+        throw new ResourceHandlerException("Not a tc operation type : " +
+            opType);
+      }
+    }
+
+    private BatchBuilder addRootQDisc() {
+      commands.add(getStringForAddRootQDisc());
+      return this;
+    }
+
+    private BatchBuilder addCGroupFilter() {
+      commands.add(getStringForaAddCGroupFilter());
+      return this;
+    }
+
+    private BatchBuilder addClassToRootQDisc(int rateMbit) {
+      commands.add(getStringForAddClassToRootQDisc(rateMbit));
+      return this;
+    }
+
+    private BatchBuilder addDefaultClass(int rateMbit, int ceilMbit) {
+      commands.add(getStringForAddDefaultClass(rateMbit, ceilMbit));
+      return this;
+    }
+
+    private BatchBuilder addYARNRootClass(int rateMbit, int ceilMbit) {
+      commands.add(getStringForAddYARNRootClass(rateMbit, ceilMbit));
+      return this;
+    }
+
+    public BatchBuilder addContainerClass(int classId, int rateMbit, boolean
+        strictMode) {
+      int ceilMbit;
+
+      if (strictMode) {
+        ceilMbit = rateMbit;
+      } else {
+        ceilMbit = yarnBandwidthMbit;
+      }
+
+      commands.add(getStringForAddContainerClass(classId, rateMbit, ceilMbit));
+      return this;
+    }
+
+    public BatchBuilder deleteContainerClass(int classId) {
+      commands.add(getStringForDeleteContainerClass(classId));
+      return this;
+    }
+
+    private BatchBuilder readState() {
+      commands.add(getStringForReadState());
+      return this;
+    }
+
+    //We'll read all classes, but use a different tc operation type
+    //when reading stats for all these classes. Stats are fetched using a
+    //different tc cli option (-s).
+
+    private BatchBuilder readClasses() {
+      //We'll read all classes, but use a different tc operation type
+      //for reading stats for all these classes. Stats are fetched using a
+      //different tc cli option (-s).
+      commands.add(getStringForReadClasses());
+      return this;
+    }
+
+    private BatchBuilder wipeState() {
+      commands.add(getStringForWipeState());
+      return this;
+    }
+
+    public PrivilegedOperation commitBatchToTempFile()
+        throws ResourceHandlerException {
+      try {
+        File tcCmds = File.createTempFile(TMP_FILE_PREFIX, TMP_FILE_SUFFIX, new
+            File(tmpDirPath));
+        Writer writer = new OutputStreamWriter(new FileOutputStream(tcCmds),
+            "UTF-8");
+        PrintWriter printWriter = new PrintWriter(writer);
+
+        for (String command : commands) {
+          printWriter.println(command);
+        }
+
+        printWriter.close();
+        operation.appendArgs(tcCmds.getAbsolutePath());
+
+        return operation;
+      } catch (IOException e) {
+        LOG.warn("Failed to create or write to temporary file in dir: " +
+            tmpDirPath);
+        throw new ResourceHandlerException(
+            "Failed to create or write to temporary file in dir: "
+                + tmpDirPath);
+      }
+    }
+  } //end BatchBuilder
+}

+ 78 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestResourceHandlerModule.java

@@ -0,0 +1,78 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestResourceHandlerModule {
+  private static final Log LOG = LogFactory.
+      getLog(TestResourceHandlerModule.class);
+  Configuration emptyConf;
+  Configuration networkEnabledConf;
+
+  @Before
+  public void setup() {
+    emptyConf = new YarnConfiguration();
+    networkEnabledConf = new YarnConfiguration();
+
+    networkEnabledConf.setBoolean(YarnConfiguration.NM_NETWORK_RESOURCE_ENABLED,
+        true);
+    //We need to bypass mtab parsing for figuring out cgroups mount locations
+    networkEnabledConf.setBoolean(YarnConfiguration
+        .NM_LINUX_CONTAINER_CGROUPS_MOUNT, true);
+  }
+
+  @Test
+  public void testOutboundBandwidthHandler() {
+    try {
+      //This resourceHandler should be non-null only if network as a resource
+      //is explicitly enabled
+      OutboundBandwidthResourceHandler resourceHandler = ResourceHandlerModule
+          .getOutboundBandwidthResourceHandler(emptyConf);
+      Assert.assertNull(resourceHandler);
+
+      //When network as a resource is enabled this should be non-null
+      resourceHandler = ResourceHandlerModule
+          .getOutboundBandwidthResourceHandler(networkEnabledConf);
+      Assert.assertNotNull(resourceHandler);
+
+      //Ensure that outbound bandwidth resource handler is present in the chain
+      ResourceHandlerChain resourceHandlerChain = ResourceHandlerModule
+          .getConfiguredResourceHandlerChain(networkEnabledConf);
+      List<ResourceHandler> resourceHandlers = resourceHandlerChain
+          .getResourceHandlerList();
+      //Exactly one resource handler in chain
+      Assert.assertEquals(resourceHandlers.size(), 1);
+      //Same instance is expected to be in the chain.
+      Assert.assertTrue(resourceHandlers.get(0) == resourceHandler);
+    } catch (ResourceHandlerException e) {
+      Assert.fail("Unexpected ResourceHandlerException: " + e);
+    }
+  }
+}

+ 231 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficControlBandwidthHandlerImpl.java

@@ -0,0 +1,231 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.File;
+import java.util.List;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class TestTrafficControlBandwidthHandlerImpl {
+  private static final Log LOG =
+      LogFactory.getLog(TestTrafficControlBandwidthHandlerImpl.class);
+  private static final int ROOT_BANDWIDTH_MBIT = 100;
+  private static final int YARN_BANDWIDTH_MBIT = 70;
+  private static final int TEST_CLASSID = 100;
+  private static final String TEST_CLASSID_STR = "42:100";
+  private static final String TEST_CONTAINER_ID_STR = "container_01";
+  private static final String TEST_TASKS_FILE = "testTasksFile";
+
+  private PrivilegedOperationExecutor privilegedOperationExecutorMock;
+  private CGroupsHandler cGroupsHandlerMock;
+  private TrafficController trafficControllerMock;
+  private Configuration conf;
+  private String tmpPath;
+  private String device;
+  ContainerId containerIdMock;
+  Container containerMock;
+
+  @Before
+  public void setup() {
+    privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class);
+    cGroupsHandlerMock = mock(CGroupsHandler.class);
+    trafficControllerMock = mock(TrafficController.class);
+    conf = new YarnConfiguration();
+    tmpPath = new StringBuffer(System.getProperty("test.build.data")).append
+        ('/').append("hadoop.tmp.dir").toString();
+    device = YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_INTERFACE;
+    containerIdMock = mock(ContainerId.class);
+    containerMock = mock(Container.class);
+    when(containerIdMock.toString()).thenReturn(TEST_CONTAINER_ID_STR);
+    //mock returning a mock - an angel died somewhere.
+    when(containerMock.getContainerId()).thenReturn(containerIdMock);
+
+    conf.setInt(YarnConfiguration
+        .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_MBIT, ROOT_BANDWIDTH_MBIT);
+    conf.setInt(YarnConfiguration
+        .NM_NETWORK_RESOURCE_OUTBOUND_BANDWIDTH_YARN_MBIT, YARN_BANDWIDTH_MBIT);
+    conf.set("hadoop.tmp.dir", tmpPath);
+    //In these tests, we'll only use TrafficController with recovery disabled
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+  }
+
+  @Test
+  public void testBootstrap() {
+    TrafficControlBandwidthHandlerImpl handlerImpl = new
+        TrafficControlBandwidthHandlerImpl(privilegedOperationExecutorMock,
+        cGroupsHandlerMock, trafficControllerMock);
+
+    try {
+      handlerImpl.bootstrap(conf);
+      verify(cGroupsHandlerMock).mountCGroupController(
+          eq(CGroupsHandler.CGroupController.NET_CLS));
+      verifyNoMoreInteractions(cGroupsHandlerMock);
+      verify(trafficControllerMock).bootstrap(eq(device),
+          eq(ROOT_BANDWIDTH_MBIT),
+          eq(YARN_BANDWIDTH_MBIT));
+      verifyNoMoreInteractions(trafficControllerMock);
+    } catch (ResourceHandlerException e) {
+      LOG.error("Unexpected exception: " + e);
+      Assert.fail("Caught unexpected ResourceHandlerException!");
+    }
+  }
+
+  @Test
+  public void testLifeCycle() {
+    TrafficController trafficControllerSpy = spy(new TrafficController(conf,
+        privilegedOperationExecutorMock));
+    TrafficControlBandwidthHandlerImpl handlerImpl = new
+        TrafficControlBandwidthHandlerImpl(privilegedOperationExecutorMock,
+        cGroupsHandlerMock, trafficControllerSpy);
+
+    try {
+      handlerImpl.bootstrap(conf);
+      testPreStart(trafficControllerSpy, handlerImpl);
+      testPostComplete(trafficControllerSpy, handlerImpl);
+    } catch (ResourceHandlerException e) {
+      LOG.error("Unexpected exception: " + e);
+      Assert.fail("Caught unexpected ResourceHandlerException!");
+    }
+  }
+
+  private void testPreStart(TrafficController trafficControllerSpy,
+      TrafficControlBandwidthHandlerImpl handlerImpl) throws
+      ResourceHandlerException {
+    //This is not the cleanest of solutions - but since we are testing the
+    //preStart/postComplete lifecycle, we don't have a different way of
+    //handling this - we don't keep track of the number of invocations by
+    //a class we are not testing here (TrafficController)
+    //So, we'll reset this mock. This is not a problem with other mocks.
+    reset(privilegedOperationExecutorMock);
+
+    doReturn(TEST_CLASSID).when(trafficControllerSpy).getNextClassId();
+    doReturn(TEST_CLASSID_STR).when(trafficControllerSpy)
+        .getStringForNetClsClassId(TEST_CLASSID);
+    when(cGroupsHandlerMock.getPathForCGroupTasks(CGroupsHandler
+        .CGroupController.NET_CLS, TEST_CONTAINER_ID_STR)).thenReturn(
+        TEST_TASKS_FILE);
+
+    List<PrivilegedOperation> ops = handlerImpl.preStart(containerMock);
+
+    //Ensure that cgroups is created and updated as expected
+    verify(cGroupsHandlerMock).createCGroup(
+        eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR));
+    verify(cGroupsHandlerMock).updateCGroupParam(
+        eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR),
+        eq(CGroupsHandler.CGROUP_PARAM_CLASSID), eq(TEST_CLASSID_STR));
+
+    //Now check the privileged operations being returned
+    //We expect two operations - one for adding pid to tasks file and another
+    //for a tc modify operation
+    Assert.assertEquals(2, ops.size());
+
+    //Verify that the add pid op is correct
+    PrivilegedOperation addPidOp = ops.get(0);
+    String expectedAddPidOpArg = PrivilegedOperation.CGROUP_ARG_PREFIX +
+        TEST_TASKS_FILE;
+    List<String> addPidOpArgs = addPidOp.getArguments();
+
+    Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
+        addPidOp.getOperationType());
+    Assert.assertEquals(1, addPidOpArgs.size());
+    Assert.assertEquals(expectedAddPidOpArg, addPidOpArgs.get(0));
+
+    //Verify that that tc modify op is correct
+    PrivilegedOperation tcModifyOp = ops.get(1);
+    List<String> tcModifyOpArgs = tcModifyOp.getArguments();
+
+    Assert.assertEquals(PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+        tcModifyOp.getOperationType());
+    Assert.assertEquals(1, tcModifyOpArgs.size());
+    //verify that the tc command file exists
+    Assert.assertTrue(new File(tcModifyOpArgs.get(0)).exists());
+  }
+
+  private void testPostComplete(TrafficController trafficControllerSpy,
+      TrafficControlBandwidthHandlerImpl handlerImpl) throws
+      ResourceHandlerException {
+    //This is not the cleanest of solutions - but since we are testing the
+    //preStart/postComplete lifecycle, we don't have a different way of
+    //handling this - we don't keep track of the number of invocations by
+    //a class we are not testing here (TrafficController)
+    //So, we'll reset this mock. This is not a problem with other mocks.
+    reset(privilegedOperationExecutorMock);
+
+    List<PrivilegedOperation> ops = handlerImpl.postComplete(containerIdMock);
+
+    verify(cGroupsHandlerMock).deleteCGroup(
+        eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR));
+
+    try {
+      //capture privileged op argument and ensure it is correct
+      ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass
+          (PrivilegedOperation.class);
+
+      verify(privilegedOperationExecutorMock)
+          .executePrivilegedOperation(opCaptor.capture(), eq(false));
+
+      List<String> args = opCaptor.getValue().getArguments();
+
+      Assert.assertEquals(PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          opCaptor.getValue().getOperationType());
+      Assert.assertEquals(1, args.size());
+      //ensure that tc command file exists
+      Assert.assertTrue(new File(args.get(0)).exists());
+
+      verify(trafficControllerSpy).releaseClassId(TEST_CLASSID);
+    } catch (PrivilegedOperationException e) {
+      LOG.error("Caught exception: " + e);
+      Assert.fail("Unexpected PrivilegedOperationException from mock!");
+    }
+
+    //We don't expect any operations to be returned here
+    Assert.assertNull(ops);
+  }
+
+  @After
+  public void teardown() {
+    FileUtil.fullyDelete(new File(tmpPath));
+  }
+}

+ 327 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestTrafficController.java

@@ -0,0 +1,327 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestTrafficController {
+  private static final Log LOG = LogFactory.getLog(TestTrafficController.class);
+  private static final int ROOT_BANDWIDTH_MBIT = 100;
+  private static final int YARN_BANDWIDTH_MBIT = 70;
+  private static final int CONTAINER_BANDWIDTH_MBIT = 10;
+
+  //These constants are closely tied to the implementation of TrafficController
+  //and will have to be modified in tandem with any related TrafficController
+  //changes.
+  private static final String DEVICE = "eth0";
+  private static final String WIPE_STATE_CMD = "qdisc del dev eth0 parent root";
+  private static final String ADD_ROOT_QDISC_CMD =
+      "qdisc add dev eth0 root handle 42: htb default 2";
+  private static final String ADD_CGROUP_FILTER_CMD =
+      "filter add dev eth0 parent 42: protocol ip prio 10 handle 1: cgroup";
+  private static final String ADD_ROOT_CLASS_CMD =
+      "class add dev eth0 parent 42:0 classid 42:1 htb rate 100mbit ceil 100mbit";
+  private static final String ADD_DEFAULT_CLASS_CMD =
+      "class add dev eth0 parent 42:1 classid 42:2 htb rate 30mbit ceil 100mbit";
+  private static final String ADD_YARN_CLASS_CMD =
+      "class add dev eth0 parent 42:1 classid 42:3 htb rate 70mbit ceil 70mbit";
+  private static final String DEFAULT_TC_STATE_EXAMPLE =
+      "qdisc pfifo_fast 0: root refcnt 2 bands 3 priomap  1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1";
+  private static final String READ_QDISC_CMD = "qdisc show dev eth0";
+  private static final String READ_FILTER_CMD = "filter show dev eth0";
+  private static final String READ_CLASS_CMD = "class show dev eth0";
+  private static final int MIN_CONTAINER_CLASS_ID = 4;
+  private static final String FORMAT_CONTAINER_CLASS_STR = "0x0042%04d";
+  private static final String FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE =
+      "class add dev eth0 parent 42:3 classid 42:%d htb rate 10mbit ceil %dmbit";
+  private static final String FORAMT_DELETE_CONTAINER_CLASS_FROM_DEVICE =
+      "class del dev eth0 classid 42:%d";
+
+  private static final int TEST_CLASS_ID = 97;
+  //decimal form of 0x00420097 - when reading a classid file, it is read out
+  //as decimal
+  private static final String TEST_CLASS_ID_DECIMAL_STR = "4325527";
+
+  private Configuration conf;
+  private String tmpPath;
+
+  private PrivilegedOperationExecutor privilegedOperationExecutorMock;
+
+  @Before
+  public void setup() {
+    privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class);
+    conf = new YarnConfiguration();
+    tmpPath = new StringBuffer(System.getProperty("test.build.data")).append
+        ('/').append("hadoop.tmp.dir").toString();
+
+    conf.set("hadoop.tmp.dir", tmpPath);
+  }
+
+  private void verifyTrafficControlOperation(PrivilegedOperation op,
+      PrivilegedOperation.OperationType expectedOpType,
+      List<String> expectedTcCmds)
+      throws IOException {
+    //Verify that the optype matches
+    Assert.assertEquals(expectedOpType, op.getOperationType());
+
+    List<String> args = op.getArguments();
+
+    //Verify that arg count is always 1 (tc command file) for a tc operation
+    Assert.assertEquals(1, args.size());
+
+    File tcCmdsFile = new File(args.get(0));
+
+    //Verify that command file exists
+    Assert.assertTrue(tcCmdsFile.exists());
+
+    List<String> tcCmds = Files.readAllLines(tcCmdsFile.toPath(),
+        Charset.forName("UTF-8"));
+
+    //Verify that the number of commands is the same as expected and verify
+    //that each command is the same, in sequence
+    Assert.assertEquals(expectedTcCmds.size(), tcCmds.size());
+    for (int i = 0; i < tcCmds.size(); ++i) {
+      Assert.assertEquals(expectedTcCmds.get(i), tcCmds.get(i));
+    }
+  }
+
+  @Test
+  public void testBootstrapRecoveryDisabled() {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+
+    TrafficController trafficController = new TrafficController(conf,
+        privilegedOperationExecutorMock);
+
+    try {
+      trafficController
+          .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
+
+      ArgumentCaptor<PrivilegedOperation> opCaptor = ArgumentCaptor.forClass
+          (PrivilegedOperation.class);
+
+      //NM_RECOVERY_DISABLED - so we expect two privileged operation executions
+      //one for wiping tc state - a second for initializing state
+      verify(privilegedOperationExecutorMock, times(2))
+          .executePrivilegedOperation(opCaptor.capture(), eq(false));
+
+      //Now verify that the two operations were correct
+      List<PrivilegedOperation> ops = opCaptor.getAllValues();
+
+      verifyTrafficControlOperation(ops.get(0),
+          PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          Arrays.asList(WIPE_STATE_CMD));
+
+      verifyTrafficControlOperation(ops.get(1),
+          PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          Arrays.asList(ADD_ROOT_QDISC_CMD, ADD_CGROUP_FILTER_CMD,
+              ADD_ROOT_CLASS_CMD, ADD_DEFAULT_CLASS_CMD, ADD_YARN_CLASS_CMD));
+    } catch (ResourceHandlerException | PrivilegedOperationException |
+        IOException e) {
+      LOG.error("Unexpected exception: " + e);
+      Assert.fail("Caught unexpected exception: "
+          + e.getClass().getSimpleName());
+    }
+  }
+
+  @Test
+  public void testBootstrapRecoveryEnabled() {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+
+    TrafficController trafficController = new TrafficController(conf,
+        privilegedOperationExecutorMock);
+
+    try {
+      //Return a default tc state when attempting to read state
+      when(privilegedOperationExecutorMock.executePrivilegedOperation(
+          any(PrivilegedOperation.class), eq(true)))
+          .thenReturn(DEFAULT_TC_STATE_EXAMPLE);
+
+      trafficController
+          .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
+
+      ArgumentCaptor<PrivilegedOperation> readOpCaptor = ArgumentCaptor.forClass
+          (PrivilegedOperation.class);
+
+      //NM_RECOVERY_ENABLED - so we expect three privileged operation executions
+      //1) read tc state 2) wipe tc state 3) init tc state
+      //one for wiping tc state - a second for initializing state
+      //First, verify read op
+      verify(privilegedOperationExecutorMock, times(1))
+          .executePrivilegedOperation(readOpCaptor.capture(), eq(true));
+      List<PrivilegedOperation> readOps = readOpCaptor.getAllValues();
+      verifyTrafficControlOperation(readOps.get(0),
+          PrivilegedOperation.OperationType.TC_READ_STATE,
+          Arrays.asList(READ_QDISC_CMD, READ_FILTER_CMD, READ_CLASS_CMD));
+
+      ArgumentCaptor<PrivilegedOperation> writeOpCaptor = ArgumentCaptor
+          .forClass(PrivilegedOperation.class);
+      verify(privilegedOperationExecutorMock, times(2))
+          .executePrivilegedOperation(writeOpCaptor.capture(), eq(false));
+      //Now verify that the two write operations were correct
+      List<PrivilegedOperation> writeOps = writeOpCaptor.getAllValues();
+      verifyTrafficControlOperation(writeOps.get(0),
+          PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          Arrays.asList(WIPE_STATE_CMD));
+
+      verifyTrafficControlOperation(writeOps.get(1),
+          PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          Arrays.asList(ADD_ROOT_QDISC_CMD, ADD_CGROUP_FILTER_CMD,
+              ADD_ROOT_CLASS_CMD, ADD_DEFAULT_CLASS_CMD, ADD_YARN_CLASS_CMD));
+    } catch (ResourceHandlerException | PrivilegedOperationException |
+        IOException e) {
+      LOG.error("Unexpected exception: " + e);
+      Assert.fail("Caught unexpected exception: "
+          + e.getClass().getSimpleName());
+    }
+  }
+
+  @Test
+  public void testInvalidBuilder() {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+
+    TrafficController trafficController = new TrafficController(conf,
+        privilegedOperationExecutorMock);
+    try {
+      trafficController
+          .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
+
+      try {
+        //Invalid op type for TC batch builder
+        TrafficController.BatchBuilder invalidBuilder = trafficController.
+            new BatchBuilder(
+            PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP);
+        Assert.fail("Invalid builder check failed!");
+      } catch (ResourceHandlerException e) {
+        //expected
+      }
+    } catch (ResourceHandlerException e) {
+      LOG.error("Unexpected exception: " + e);
+      Assert.fail("Caught unexpected exception: "
+          + e.getClass().getSimpleName());
+    }
+  }
+
+  @Test
+  public void testClassIdFileContentParsing() {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+
+    TrafficController trafficController = new TrafficController(conf,
+        privilegedOperationExecutorMock);
+
+    //Verify that classid file contents are parsed correctly
+    //This call strips the QDISC prefix and returns the classid asociated with
+    //the container
+    int parsedClassId = trafficController.getClassIdFromFileContents
+        (TEST_CLASS_ID_DECIMAL_STR);
+
+    Assert.assertEquals(TEST_CLASS_ID, parsedClassId);
+  }
+
+  @Test
+  public void testContainerOperations() {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
+
+    TrafficController trafficController = new TrafficController(conf,
+        privilegedOperationExecutorMock);
+    try {
+      trafficController
+          .bootstrap(DEVICE, ROOT_BANDWIDTH_MBIT, YARN_BANDWIDTH_MBIT);
+
+      int classId = trafficController.getNextClassId();
+
+      Assert.assertTrue(classId >= MIN_CONTAINER_CLASS_ID);
+      Assert.assertEquals(String.format(FORMAT_CONTAINER_CLASS_STR, classId),
+          trafficController.getStringForNetClsClassId(classId));
+
+      //Verify that the operation is setup correctly with strictMode = false
+      TrafficController.BatchBuilder builder = trafficController.
+          new BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
+          .addContainerClass(classId, CONTAINER_BANDWIDTH_MBIT, false);
+      PrivilegedOperation addClassOp = builder.commitBatchToTempFile();
+
+      String expectedAddClassCmd = String.format
+          (FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE, classId, YARN_BANDWIDTH_MBIT);
+      verifyTrafficControlOperation(addClassOp,
+          PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          Arrays.asList(expectedAddClassCmd));
+
+      //Verify that the operation is setup correctly with strictMode = true
+      TrafficController.BatchBuilder strictModeBuilder = trafficController.
+          new BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
+          .addContainerClass(classId, CONTAINER_BANDWIDTH_MBIT, true);
+      PrivilegedOperation addClassStrictModeOp = strictModeBuilder
+          .commitBatchToTempFile();
+
+      String expectedAddClassStrictModeCmd = String.format
+          (FORMAT_ADD_CONTAINER_CLASS_TO_DEVICE, classId,
+              CONTAINER_BANDWIDTH_MBIT);
+      verifyTrafficControlOperation(addClassStrictModeOp,
+          PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          Arrays.asList(expectedAddClassStrictModeCmd));
+
+      TrafficController.BatchBuilder deleteBuilder = trafficController.new
+          BatchBuilder(PrivilegedOperation.OperationType.TC_MODIFY_STATE)
+          .deleteContainerClass(classId);
+      PrivilegedOperation deleteClassOp = deleteBuilder.commitBatchToTempFile();
+
+      String expectedDeleteClassCmd = String.format
+          (FORAMT_DELETE_CONTAINER_CLASS_FROM_DEVICE, classId);
+      verifyTrafficControlOperation(deleteClassOp,
+          PrivilegedOperation.OperationType.TC_MODIFY_STATE,
+          Arrays.asList(expectedDeleteClassCmd));
+    } catch (ResourceHandlerException | IOException e) {
+      LOG.error("Unexpected exception: " + e);
+      Assert.fail("Caught unexpected exception: "
+          + e.getClass().getSimpleName());
+    }
+  }
+
+  @After
+  public void teardown() {
+    FileUtil.fullyDelete(new File(tmpPath));
+  }
+}