浏览代码

MAPREDUCE-6773. Implement RM Container Reuse Requestor to handle the reuse containers for resource requests. Devaraj K

Naganarasimha 8 年之前
父节点
当前提交
e274d508ff

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -972,7 +972,7 @@ public class MRAppMaster extends CompositeService {
             , containerID);
       } else {
         this.containerAllocator = new RMContainerAllocator(
-            this.clientService, this.context, preemptionPolicy);
+            this.clientService, this.context, preemptionPolicy, dispatcher);
       }
       ((Service)this.containerAllocator).init(getConfig());
       ((Service)this.containerAllocator).start();

+ 47 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerAvailableEvent.java

@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+/**
+ * Event class for ContainerRequestor.
+ */
+public class ContainerAvailableEvent
+    extends AbstractEvent<RMContainerReuseRequestor.EventType> {
+
+  private final TaskAttemptId taskAttemptId;
+  private final Container container;
+
+  public ContainerAvailableEvent(RMContainerReuseRequestor.EventType eventType,
+      TaskAttemptId taskAttemptId, Container container) {
+    super(eventType);
+    this.taskAttemptId = taskAttemptId;
+    this.container = container;
+  }
+
+  public TaskAttemptId getTaskAttemptId() {
+    return taskAttemptId;
+  }
+
+  public Container getContainer() {
+    return container;
+  }
+}

+ 13 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.api.NMTokenCache;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
@@ -198,10 +199,13 @@ public class RMContainerAllocator extends RMCommunicator
 
   private String reduceNodeLabelExpression;
 
+  private Dispatcher dispatcher;
+
   public RMContainerAllocator(ClientService clientService, AppContext context,
-      AMPreemptionPolicy preemptionPolicy) {
+      AMPreemptionPolicy preemptionPolicy, Dispatcher dispatcher) {
     super(clientService, context);
     this.preemptionPolicy = preemptionPolicy;
+    this.dispatcher = dispatcher;
     this.stopped = new AtomicBoolean(false);
     this.clock = context.getClock();
     this.assignedRequests = createAssignedRequests();
@@ -247,7 +251,14 @@ public class RMContainerAllocator extends RMCommunicator
             MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENT));
     LOG.info(this.scheduledRequests.getNumOpportunisticMapsPercent() +
         "% of the mappers will be scheduled using OPPORTUNISTIC containers");
-    containerRequestor = new RMContainerRequestor(this);
+    if (conf.getBoolean(MRJobConfig.MR_AM_CONTAINER_REUSE_ENABLED,
+        MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_ENABLED)) {
+      containerRequestor = new RMContainerReuseRequestor(eventHandler, this);
+      dispatcher.register(RMContainerReuseRequestor.EventType.class,
+          (RMContainerReuseRequestor) containerRequestor);
+    } else {
+      containerRequestor = new RMContainerRequestor(this);
+    }
     containerRequestor.init(conf);
   }
 

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java

@@ -79,7 +79,7 @@ public class RMContainerRequestor extends AbstractService
   //Value->Map
   //Key->Resource Capability
   //Value->ResourceRequest
-  private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+  protected final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
   remoteRequestsTable =
       new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
 

+ 224 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java

@@ -0,0 +1,224 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Keeps the data for RMContainer's reuse.
+ */
+public class RMContainerReuseRequestor extends RMContainerRequestor
+    implements EventHandler<ContainerAvailableEvent> {
+  private static final Log LOG = LogFactory
+      .getLog(RMContainerReuseRequestor.class);
+
+  private Map<Container, String> containersToReuse =
+      new ConcurrentHashMap<Container, String>();
+  private Map<ContainerId, List<TaskAttemptId>> containerToTaskAttemptsMap =
+      new HashMap<ContainerId, List<TaskAttemptId>>();
+  private int containerReuseMaxMapTasks;
+  private int containerReuseMaxReduceTasks;
+  private int maxMapTaskContainers;
+  private int maxReduceTaskContainers;
+  private int noOfMapTaskContainersForReuse;
+  private int noOfReduceTaskContainersForReuse;
+
+  private RMCommunicator rmCommunicator;
+
+  public RMContainerReuseRequestor(
+      EventHandler<ContainerAvailableEvent> eventHandler,
+      RMCommunicator rmCommunicator) {
+    super(rmCommunicator);
+    this.rmCommunicator = rmCommunicator;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    containerReuseMaxMapTasks = conf.getInt(
+        MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKS,
+        MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_MAPTASKS);
+    containerReuseMaxReduceTasks = conf.getInt(
+        MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKS,
+        MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_REDUCETASKS);
+    maxMapTaskContainers = conf.getInt(
+        MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKCONTAINERS,
+        MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_MAPTASKCONTAINERS);
+    maxReduceTaskContainers = conf.getInt(
+        MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKCONTAINERS,
+        MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_REDUCETASKCONTAINERS);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  public AllocateResponse makeRemoteRequest()
+      throws YarnException, IOException {
+    AllocateResponse amResponse = super.makeRemoteRequest();
+    synchronized (containersToReuse) {
+      List<Container> allocatedContainers = amResponse.getAllocatedContainers();
+      allocatedContainers.addAll(containersToReuse.keySet());
+      containersToReuse.clear();
+    }
+    return amResponse;
+  }
+
+  @Override
+  public void containerFailedOnHost(String hostName) {
+    super.containerFailedOnHost(hostName);
+    boolean blacklisted = super.isNodeBlacklisted(hostName);
+    if (blacklisted) {
+      Set<Container> containersOnHost = new HashSet<Container>();
+      for (Entry<Container, String> elem : containersToReuse.entrySet()) {
+        if (elem.getValue().equals(hostName)) {
+          containersOnHost.add(elem.getKey());
+        }
+      }
+      for (Container container : containersOnHost) {
+        containersToReuse.remove(container);
+      }
+    }
+  }
+
+  @Override
+  public void handle(ContainerAvailableEvent event) {
+    Container container = event.getContainer();
+    ContainerId containerId = container.getId();
+    String resourceName = container.getNodeId().getHost();
+    boolean canReuse = false;
+    Priority priority = container.getPriority();
+    if (RMContainerAllocator.PRIORITY_MAP.equals(priority)
+        || RMContainerAllocator.PRIORITY_REDUCE.equals(priority)) {
+      List<TaskAttemptId> containerTaskAttempts = null;
+      containerTaskAttempts = containerToTaskAttemptsMap.get(containerId);
+      if (containerTaskAttempts == null) {
+        containerTaskAttempts = new ArrayList<TaskAttemptId>();
+        containerToTaskAttemptsMap.put(containerId, containerTaskAttempts);
+      }
+      if (checkMapContainerReuseConstraints(priority, containerTaskAttempts)
+          || checkReduceContainerReuseConstraints(priority,
+              containerTaskAttempts)) {
+        Map<String, Map<Resource, ResourceRequest>> resourceRequests =
+            remoteRequestsTable.get(priority);
+        // If there are any eligible requests
+        if (resourceRequests != null && !resourceRequests.isEmpty()) {
+          canReuse = true;
+          containerTaskAttempts.add(event.getTaskAttemptId());
+        }
+      }
+      ((RMContainerAllocator) rmCommunicator)
+          .resetContainerForReuse(container.getId());
+      if (canReuse) {
+        containersToReuse.put(container, resourceName);
+        incrementRunningReuseContainers(priority);
+        LOG.info("Adding the " + containerId + " for reuse.");
+      } else {
+        LOG.info("Releasing the container : " + containerId
+            + " since it is not eligible for reuse or no pending requests.");
+        containerComplete(container);
+        pendingRelease.add(containerId);
+        release(containerId);
+      }
+    }
+  }
+
+  private boolean checkMapContainerReuseConstraints(Priority priority,
+      List<TaskAttemptId> containerTaskAttempts) {
+    return RMContainerAllocator.PRIORITY_MAP.equals(priority)
+        // Check for how many tasks can map task container run maximum
+        && ((containerTaskAttempts.size() < containerReuseMaxMapTasks
+            || containerReuseMaxMapTasks == -1)
+            // Check for no of map task containers running
+            && (noOfMapTaskContainersForReuse < maxMapTaskContainers
+                || maxMapTaskContainers == -1));
+  }
+
+  private boolean checkReduceContainerReuseConstraints(Priority priority,
+      List<TaskAttemptId> containerTaskAttempts) {
+    return RMContainerAllocator.PRIORITY_REDUCE.equals(priority)
+        // Check for how many tasks can reduce task container run maximum
+        && ((containerTaskAttempts.size() < containerReuseMaxReduceTasks
+            || containerReuseMaxReduceTasks == -1)
+            // Check for no of reduce task containers running
+            && (noOfReduceTaskContainersForReuse < maxReduceTaskContainers
+                || maxReduceTaskContainers == -1));
+  }
+
+  private void containerComplete(Container container) {
+    if (!containerToTaskAttemptsMap.containsKey(container.getId())) {
+      return;
+    }
+    containerToTaskAttemptsMap.remove(container.getId());
+    if (RMContainerAllocator.PRIORITY_MAP.equals(container.getPriority())) {
+      noOfMapTaskContainersForReuse--;
+    } else if (RMContainerAllocator.PRIORITY_REDUCE
+        .equals(container.getPriority())) {
+      noOfReduceTaskContainersForReuse--;
+    }
+  }
+
+  private void incrementRunningReuseContainers(Priority priority) {
+    if (RMContainerAllocator.PRIORITY_MAP.equals(priority)) {
+      noOfMapTaskContainersForReuse++;
+    } else if (RMContainerAllocator.PRIORITY_REDUCE.equals(priority)) {
+      noOfReduceTaskContainersForReuse++;
+    }
+  }
+
+  @Private
+  @VisibleForTesting
+  Map<Container, String> getContainersToReuse() {
+    return containersToReuse;
+  }
+
+  /**
+   * Container Available EventType.
+   */
+  public static enum EventType {
+    CONTAINER_AVAILABLE
+  }
+}

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java

@@ -209,7 +209,7 @@ public class MRAppBenchmark {
           ClientService clientService, AppContext context) {
 
         AMPreemptionPolicy policy = new NoopAMPreemptionPolicy();
-        return new RMContainerAllocator(clientService, context, policy) {
+        return new RMContainerAllocator(clientService, context, policy, null) {
           @Override
           protected ApplicationMasterProtocol createSchedulerProxy() {
             return new ApplicationMasterProtocol() {

+ 9 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java

@@ -2027,7 +2027,8 @@ public class TestRMContainerAllocator {
     // Use this constructor when using a real job.
     MyContainerAllocator(MyResourceManager rm,
         ApplicationAttemptId appAttemptId, AppContext context) {
-      super(createMockClientService(), context, new NoopAMPreemptionPolicy());
+      super(createMockClientService(), context, new NoopAMPreemptionPolicy(),
+          null);
       this.rm = rm;
     }
 
@@ -2035,7 +2036,7 @@ public class TestRMContainerAllocator {
     public MyContainerAllocator(MyResourceManager rm, Configuration conf,
         ApplicationAttemptId appAttemptId, Job job) {
       super(createMockClientService(), createAppContext(appAttemptId, job),
-          new NoopAMPreemptionPolicy());
+          new NoopAMPreemptionPolicy(), null);
       this.rm = rm;
       super.init(conf);
       super.start();
@@ -2045,7 +2046,7 @@ public class TestRMContainerAllocator {
         ApplicationAttemptId appAttemptId, Job job, Clock clock) {
       super(createMockClientService(),
           createAppContext(appAttemptId, job, clock),
-          new NoopAMPreemptionPolicy());
+          new NoopAMPreemptionPolicy(), null);
       this.rm = rm;
       super.init(conf);
       super.start();
@@ -2166,7 +2167,7 @@ public class TestRMContainerAllocator {
     AMPreemptionPolicy policy = mock(AMPreemptionPolicy.class);
     when(communicator.getJob()).thenReturn(mockJob);
     RMContainerAllocator allocator = new RMContainerAllocator(service, context,
-        policy);
+        policy, null);
     AllocateResponse response = Records.newRecord(AllocateResponse.class);
     allocator.handleJobPriorityChange(response);
   }
@@ -2369,7 +2370,7 @@ public class TestRMContainerAllocator {
 
     RMContainerAllocator allocator = new RMContainerAllocator(
         mock(ClientService.class), appContext,
-        new NoopAMPreemptionPolicy()) {
+        new NoopAMPreemptionPolicy(), null) {
           @Override
           protected void register() {
           }
@@ -2420,8 +2421,8 @@ public class TestRMContainerAllocator {
   public void testCompletedContainerEvent() {
     RMContainerAllocator allocator = new RMContainerAllocator(
         mock(ClientService.class), mock(AppContext.class),
-        new NoopAMPreemptionPolicy());
-
+        new NoopAMPreemptionPolicy(), null);
+    
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
         MRBuilderUtils.newTaskId(
             MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
@@ -3319,7 +3320,7 @@ public class TestRMContainerAllocator {
       extends RMContainerAllocator {
     public RMContainerAllocatorForFinishedContainer(ClientService clientService,
         AppContext context, AMPreemptionPolicy preemptionPolicy) {
-      super(clientService, context, preemptionPolicy);
+      super(clientService, context, preemptionPolicy, null);
     }
     @Override
     protected AssignedRequests createAssignedRequests() {

+ 307 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java

@@ -0,0 +1,307 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.EventType;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for RMContainerReuseRequestor.
+ */
+public class TestRMContainerReuseRequestor {
+
+  private RMContainerReuseRequestor reuseRequestor;
+
+  @Before
+  public void setup() throws IOException {
+    reuseRequestor = new RMContainerReuseRequestor(null,
+        mock(RMContainerAllocator.class));
+  }
+
+  @Test
+  public void testNoOfTimesEachMapTaskContainerCanReuseWithDefaultConfig() {
+    // Verify that no of times each map task container can be reused with
+    // default configuration for
+    // 'yarn.app.mapreduce.am.container.reuse.max-maptasks'.
+    testNoOfTimesEachContainerCanReuseWithDefaultConfig(TaskType.MAP,
+        RMContainerAllocator.PRIORITY_MAP);
+  }
+
+  @Test
+  public void testNoOfTimesEachMapTaskContainerCanReuseWithConfigLimit() {
+    // Verify that no of times each map task container can be reused when
+    // 'yarn.app.mapreduce.am.container.reuse.max-maptasks' configured with a
+    // value.
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKS, 1);
+    testNoOfTimesEachContainerCanReuseWithConfigLimit(TaskType.MAP,
+        RMContainerAllocator.PRIORITY_MAP, conf);
+  }
+
+  @Test
+  public void testNoOfTimesEachRedTaskContainerCanReuseWithDefaultConfig() {
+    // Verify that no of times each reduce task container can be reused with
+    // default configuration for
+    // 'yarn.app.mapreduce.am.container.reuse.max-reducetasks'.
+    testNoOfTimesEachContainerCanReuseWithDefaultConfig(TaskType.REDUCE,
+        RMContainerAllocator.PRIORITY_REDUCE);
+  }
+
+  @Test
+  public void testNoOfTimesEachRedTaskContainerCanReuseWithConfigLimit() {
+    // Verify that no of times each map task container can be reused when
+    // 'yarn.app.mapreduce.am.container.reuse.max-reducetasks' configured with a
+    // value.
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKS, 1);
+    testNoOfTimesEachContainerCanReuseWithConfigLimit(TaskType.REDUCE,
+        RMContainerAllocator.PRIORITY_REDUCE, conf);
+  }
+
+  @Test
+  public void testNoOfMaxMapTaskContainersCanReuseWithDefaultConfig() {
+    // Verify that no of maximum map containers can be reused at any time with
+    // default configuration for
+    // 'yarn.app.mapreduce.am.container.reuse.max-maptaskcontainers'.
+    testNoOfMaxContainersCanReuseWithDefaultConfig(TaskType.MAP,
+        RMContainerAllocator.PRIORITY_MAP);
+  }
+
+  @Test
+  public void testNoOfMaxMapTaskContainersCanReuseWithConfigLimit() {
+    // Verify that no of maximum map containers can be reused at any time when
+    // 'yarn.app.mapreduce.am.container.reuse.max-maptaskcontainers' configured
+    // with a limit value.
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKCONTAINERS, 1);
+    testNoOfMaxContainersCanReuseWithConfigLimit(TaskType.MAP,
+        RMContainerAllocator.PRIORITY_MAP, conf);
+  }
+
+  @Test
+  public void testNoOfMaxRedTaskContainersCanReuseWithDefaultConfig() {
+    // Verify that no of maximum reduce containers can be reused at any time
+    // with default configuration for
+    // 'yarn.app.mapreduce.am.container.reuse.max-reducetasks'.
+    testNoOfMaxContainersCanReuseWithDefaultConfig(TaskType.REDUCE,
+        RMContainerAllocator.PRIORITY_REDUCE);
+  }
+
+  @Test
+  public void testNoOfMaxRedTaskContainersCanReuseWithConfigLimit() {
+    // Verify that no of maximum reduce containers can be reused at any time
+    // when 'yarn.app.mapreduce.am.container.reuse.max-reducetasks' configured
+    // with a limit value.
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKCONTAINERS, 1);
+    testNoOfMaxContainersCanReuseWithConfigLimit(TaskType.REDUCE,
+        RMContainerAllocator.PRIORITY_REDUCE, conf);
+  }
+
+  @Test
+  public void testContainerFailedOnHost() throws Exception {
+    reuseRequestor.serviceInit(new Configuration());
+    Map<Container, String> containersToReuse = reuseRequestor
+        .getContainersToReuse();
+    containersToReuse
+        .put(newContainerInstance("container_1472171035081_0009_01_000008",
+            RMContainerAllocator.PRIORITY_REDUCE), "node1");
+    containersToReuse
+        .put(newContainerInstance("container_1472171035081_0009_01_000009",
+            RMContainerAllocator.PRIORITY_REDUCE), "node2");
+    reuseRequestor.getBlacklistedNodes().add("node1");
+    // It removes all containers from containersToReuse running in node1
+    reuseRequestor.containerFailedOnHost("node1");
+    Assert.assertFalse("node1 should not present in reuse containers.",
+        containersToReuse.containsValue("node1"));
+    // There will not any change to containersToReuse when there are no
+    // containers to reuse in that node
+    reuseRequestor.containerFailedOnHost("node3");
+    Assert.assertEquals(1, containersToReuse.size());
+  }
+
+  private void testNoOfTimesEachContainerCanReuseWithDefaultConfig(
+      TaskType taskType, Priority priority) {
+    // Verify that no of times each container can be reused
+
+    // Add 10 container reqs to the requestor
+    addContainerReqs(priority);
+    Container container = newContainerInstance(
+        "container_123456789_0001_01_000002", priority);
+    for (int i = 0; i < 10; i++) {
+      JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1);
+      TaskId taskId = MRBuilderUtils.newTaskId(jobId, i + 1, taskType);
+      TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 1);
+      ContainerAvailableEvent event = new ContainerAvailableEvent(
+          EventType.CONTAINER_AVAILABLE, taskAttemptId, container);
+      reuseRequestor.handle(event);
+      Map<Container, String> containersToReuse = reuseRequestor
+          .getContainersToReuse();
+      Assert.assertTrue("Container should be added for reuse.",
+          containersToReuse.containsKey(container));
+    }
+  }
+
+  private void testNoOfTimesEachContainerCanReuseWithConfigLimit(
+      TaskType taskType, Priority priority, Configuration conf) {
+    reuseRequestor.init(conf);
+    // Add a container request
+    ContainerRequest req1 = new ContainerRequest(null,
+        Resource.newInstance(2048, 1), new String[0], new String[0], priority,
+        null);
+    reuseRequestor.addContainerReq(req1);
+    // Add an another container request
+    ContainerRequest req2 = new ContainerRequest(null,
+        Resource.newInstance(2048, 1), new String[0], new String[0], priority,
+        null);
+    reuseRequestor.addContainerReq(req2);
+
+    EventType eventType = EventType.CONTAINER_AVAILABLE;
+    Container container = newContainerInstance(
+        "container_123456789_0001_01_000002", priority);
+    JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1);
+    TaskId taskId1 = MRBuilderUtils.newTaskId(jobId, 1, taskType);
+    TaskAttemptId taskAttemptId1 = MRBuilderUtils.newTaskAttemptId(taskId1, 1);
+
+    TaskId taskId2 = MRBuilderUtils.newTaskId(jobId, 2, taskType);
+    TaskAttemptId taskAttemptId2 = MRBuilderUtils.newTaskAttemptId(taskId2, 1);
+
+    ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType,
+        taskAttemptId1, container);
+    reuseRequestor.handle(event1);
+    Map<Container, String> containersToReuse = reuseRequestor
+        .getContainersToReuse();
+    // It is reusing the container
+    Assert.assertTrue("Container should be added for reuse.",
+        containersToReuse.containsKey(container));
+    containersToReuse.clear();
+    ContainerAvailableEvent event2 = new ContainerAvailableEvent(eventType,
+        taskAttemptId2, container);
+    reuseRequestor.handle(event2);
+    // It should not be reused since it has already reused and limit value is 1.
+    Assert.assertFalse("Container should not be added for reuse.",
+        containersToReuse.containsKey(container));
+  }
+
+  private void testNoOfMaxContainersCanReuseWithDefaultConfig(TaskType taskType,
+      Priority priority) {
+    // It tests no of times each container can be reused
+
+    // Add 10 container reqs to the requestor
+    addContainerReqs(priority);
+    for (int i = 0; i < 10; i++) {
+      Container container = newContainerInstance(
+          "container_123456789_0001_01_00000" + (i + 2), priority);
+      JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1);
+      TaskId taskId1 = MRBuilderUtils.newTaskId(jobId, i + 1, taskType);
+      TaskAttemptId taskAttemptId1 = MRBuilderUtils.newTaskAttemptId(taskId1,
+          1);
+      ContainerAvailableEvent event1 = new ContainerAvailableEvent(
+          EventType.CONTAINER_AVAILABLE, taskAttemptId1, container);
+      reuseRequestor.handle(event1);
+      Map<Container, String> containersToReuse = reuseRequestor
+          .getContainersToReuse();
+      Assert.assertTrue("Container should be added for reuse.",
+          containersToReuse.containsKey(container));
+    }
+  }
+
+  private void testNoOfMaxContainersCanReuseWithConfigLimit(TaskType taskType,
+      Priority priority, Configuration conf) {
+    reuseRequestor.init(conf);
+    ContainerRequest req1 = new ContainerRequest(null,
+        Resource.newInstance(2048, 1), new String[0], new String[0], priority,
+        null);
+    reuseRequestor.addContainerReq(req1);
+
+    ContainerRequest req2 = new ContainerRequest(null,
+        Resource.newInstance(2048, 1), new String[0], new String[0], priority,
+        null);
+    reuseRequestor.addContainerReq(req2);
+
+    EventType eventType = EventType.CONTAINER_AVAILABLE;
+    Container container1 = newContainerInstance(
+        "container_123456789_0001_01_000002", priority);
+    JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1);
+    TaskId taskId1 = MRBuilderUtils.newTaskId(jobId, 1, taskType);
+    TaskAttemptId taskAttemptId1 = MRBuilderUtils.newTaskAttemptId(taskId1, 1);
+
+    TaskId taskId2 = MRBuilderUtils.newTaskId(jobId, 2, taskType);
+    TaskAttemptId taskAttemptId2 = MRBuilderUtils.newTaskAttemptId(taskId2, 1);
+
+    ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType,
+        taskAttemptId1, container1);
+    reuseRequestor.handle(event1);
+    Map<Container, String> containersToReuse = reuseRequestor
+        .getContainersToReuse();
+    Assert.assertTrue("Container should be added for reuse.",
+        containersToReuse.containsKey(container1));
+    containersToReuse.clear();
+    Container container2 = newContainerInstance(
+        "container_123456789_0001_01_000003", priority);
+    ContainerAvailableEvent event2 = new ContainerAvailableEvent(eventType,
+        taskAttemptId2, container2);
+    reuseRequestor.handle(event2);
+    Assert.assertFalse("Container should not be added for reuse.",
+        containersToReuse.containsKey(container2));
+  }
+
+  private void addContainerReqs(Priority priority) {
+    Configuration conf = new Configuration();
+    reuseRequestor.init(conf);
+    for (int i = 0; i < 10; i++) {
+      ContainerRequest req = new ContainerRequest(null,
+          Resource.newInstance(2048, 1), new String[0], new String[0], priority,
+          null);
+      reuseRequestor.addContainerReq(req);
+    }
+  }
+
+  private Container newContainerInstance(String containerId,
+      Priority priority) {
+    return Container.newInstance(ContainerId.fromString(containerId),
+        NodeId.newInstance("node1", 8080), "", null, priority, null);
+  }
+
+  @After
+  public void tearDown() {
+    reuseRequestor.stop();
+  }
+}