Переглянути джерело

merge MAPREDUCE-3596 from trunk. Fix scheduler to handle cleaned up containers, which NMs may subsequently report as running. (Contributed by Vinod Kumar Vavilapalli)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1231303 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 13 роки тому
батько
коміт
a8ba510900
14 змінених файлів з 234 додано та 72 видалено
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 11 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
  3. 3 7
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
  4. 5 5
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  5. 2 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  6. 2 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
  7. 27 30
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  8. 14 5
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
  9. 5 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  10. 7 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  11. 7 9
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
  12. 2 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
  13. 145 5
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
  14. 1 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -424,6 +424,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3625. CapacityScheduler web-ui display of queue's used capacity is broken.
     (Jason Lowe via mahadev)
 
+    MAPREDUCE-3596. Fix scheduler to handle cleaned up containers, which NMs
+    may subsequently report as running. (Vinod Kumar Vavilapalli via sseth)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 11 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java

@@ -205,6 +205,17 @@ public class BuilderUtils {
     return nodeId;
   }
 
+  public static ContainerStatus newContainerStatus(ContainerId containerId,
+      ContainerState containerState, String diagnostics, int exitStatus) {
+    ContainerStatus containerStatus = recordFactory
+      .newRecordInstance(ContainerStatus.class);
+    containerStatus.setState(containerState);
+    containerStatus.setContainerId(containerId);
+    containerStatus.setDiagnostics(diagnostics);
+    containerStatus.setExitStatus(exitStatus);
+    return containerStatus;
+  }
+
   public static Container newContainer(ContainerId containerId,
       NodeId nodeId, String nodeHttpAddress,
       Resource resource, Priority priority, ContainerToken containerToken) {

+ 3 - 7
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class ContainerImpl implements Container {
@@ -370,13 +371,8 @@ public class ContainerImpl implements Container {
   public ContainerStatus cloneAndGetContainerStatus() {
     this.readLock.lock();
     try {
-      ContainerStatus containerStatus =
-          recordFactory.newRecordInstance(ContainerStatus.class);
-      containerStatus.setState(getCurrentState());
-      containerStatus.setContainerId(this.launchContext.getContainerId());
-      containerStatus.setDiagnostics(diagnostics.toString());
-  	  containerStatus.setExitStatus(exitCode);
-      return containerStatus;
+      return BuilderUtils.newContainerStatus(this.getContainerID(),
+        getCurrentState(), diagnostics.toString(), exitCode);
     } finally {
       this.readLock.unlock();
     }

+ 5 - 5
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -67,16 +67,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
+import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
+import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
+import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
 import org.apache.hadoop.yarn.webapp.WebApps.Builder;
-import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
-import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
-import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
-import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
 
 /**
  * The ResourceManager is the main class that is a set of components.
@@ -256,7 +256,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   @Private
-  public static final class SchedulerEventDispatcher extends AbstractService
+  public static class SchedulerEventDispatcher extends AbstractService
       implements EventHandler<SchedulerEvent> {
 
     private final ResourceScheduler scheduler;

+ 2 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -265,8 +265,8 @@ public class ResourceTrackerService extends AbstractService implements
     HeartbeatResponse latestResponse = recordFactory
         .newRecordInstance(HeartbeatResponse.class);
     latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
-    latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp());
-    latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup());
+    latestResponse.addAllContainersToCleanup(rmNode.getContainersToCleanUp());
+    latestResponse.addAllApplicationsToCleanup(rmNode.getAppsToCleanup());
     latestResponse.setNodeAction(NodeAction.NORMAL);
 
     // 4. Send status to RMNode, saving the latest response.

+ 2 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

@@ -101,9 +101,9 @@ public interface RMNode {
   
   public RMNodeState getState();
 
-  public List<ContainerId> pullContainersToCleanUp();
+  public List<ContainerId> getContainersToCleanUp();
 
-  public List<ApplicationId> pullAppsToCleanup();
+  public List<ApplicationId> getAppsToCleanup();
 
   public HeartbeatResponse getLastHeartBeatResponse();
 }

+ 27 - 30
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -89,7 +89,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   /* set of containers that have just launched */
   private final Map<ContainerId, ContainerStatus> justLaunchedContainers = 
     new HashMap<ContainerId, ContainerStatus>();
-  
 
   /* set of containers that need to be cleaned */
   private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
@@ -248,54 +247,38 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   }
 
   @Override
-  public List<ApplicationId> pullAppsToCleanup() {
-    this.writeLock.lock();
-
-    try {
-      List<ApplicationId> lastfinishedApplications = new ArrayList<ApplicationId>();
-      lastfinishedApplications.addAll(this.finishedApplications);
-      this.finishedApplications.clear();
-      return lastfinishedApplications;
-    } finally {
-      this.writeLock.unlock();
-    }
-
-  }
-
-  @Private
-  public List<ContainerId> getContainersToCleanUp() {
+  public List<ApplicationId> getAppsToCleanup() {
     this.readLock.lock();
+
     try {
-      return new ArrayList<ContainerId>(containersToClean);
+      return new ArrayList<ApplicationId>(this.finishedApplications);
     } finally {
       this.readLock.unlock();
     }
+
   }
   
   @Override
-  public List<ContainerId> pullContainersToCleanUp() {
+  public List<ContainerId> getContainersToCleanUp() {
 
-    this.writeLock.lock();
+    this.readLock.lock();
 
     try {
-      List<ContainerId> containersToCleanUp = new ArrayList<ContainerId>();
-      containersToCleanUp.addAll(this.containersToClean);
-      this.containersToClean.clear();
-      return containersToCleanUp;
+      return new ArrayList<ContainerId>(this.containersToClean);
     } finally {
-      this.writeLock.unlock();
+      this.readLock.unlock();
     }
   };
 
   @Override
   public HeartbeatResponse getLastHeartBeatResponse() {
 
-    this.writeLock.lock();
+    this.readLock.lock();
 
     try {
       return this.latestHeartBeatResponse;
     } finally {
-      this.writeLock.unlock();
+      this.readLock.unlock();
     }
   }
 
@@ -407,14 +390,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       for (ContainerStatus remoteContainer : statusEvent.getContainers()) {
         ContainerId containerId = remoteContainer.getContainerId();
         
-        // Don't bother with containers already scheduled for cleanup,
-        // the scheduler doens't need to know any more about this container
+        // Don't bother with containers already scheduled for cleanup, or for
+        // applications already killed. The scheduler doens't need to know any
+        // more about this container
         if (rmNode.containersToClean.contains(containerId)) {
           LOG.info("Container " + containerId + " already scheduled for " +
           		"cleanup, no further processing");
           continue;
         }
-        
+        if (rmNode.finishedApplications.contains(containerId
+            .getApplicationAttemptId().getApplicationId())) {
+          LOG.info("Container " + containerId
+              + " belongs to an application that is already killed,"
+              + " no further processing");
+          continue;
+        }
+
         // Process running containers
         if (remoteContainer.getState() == ContainerState.RUNNING) {
           if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
@@ -435,6 +426,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       
       rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
           statusEvent.getKeepAliveAppIds());
+
+      // HeartBeat processing from our end is done, as node pulls the following
+      // lists before sending status-updates. Clear data-structures
+      rmNode.containersToClean.clear();
+      rmNode.finishedApplications.clear();
+
       return RMNodeState.RUNNING;
     }
   }

+ 14 - 5
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java

@@ -39,9 +39,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFini
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
@@ -61,6 +62,7 @@ import com.google.common.collect.Multiset;
  * Each running Application in the RM corresponds to one instance
  * of this class.
  */
+@SuppressWarnings("unchecked")
 public class SchedulerApp {
 
   private static final Log LOG = LogFactory.getLog(SchedulerApp.class);
@@ -174,13 +176,20 @@ public class SchedulerApp {
     this.appSchedulingInfo.stop(rmAppAttemptFinalState);
   }
 
-  synchronized public void containerLaunchedOnNode(ContainerId containerId) {
+  public synchronized void containerLaunchedOnNode(ContainerId containerId,
+      NodeId nodeId) {
     // Inform the container
     RMContainer rmContainer = 
         getRMContainer(containerId);
-    rmContainer.handle(
-        new RMContainerEvent(containerId, 
-            RMContainerEventType.LAUNCHED));
+    if (rmContainer == null) {
+      // Some unknown container sneaked into the system. Kill it.
+      this.rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
+      return;
+    }
+
+    rmContainer.handle(new RMContainerEvent(containerId,
+      RMContainerEventType.LAUNCHED));
   }
 
   synchronized public void containerCompleted(RMContainer rmContainer,

+ 5 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -76,6 +77,7 @@ import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 
 @LimitedPrivate("yarn")
 @Evolving
+@SuppressWarnings("unchecked")
 public class CapacityScheduler 
 implements ResourceScheduler, CapacitySchedulerContext {
 
@@ -588,10 +590,12 @@ implements ResourceScheduler, CapacitySchedulerContext {
       LOG.info("Unknown application: " + applicationAttemptId + 
           " launched container " + containerId +
           " on node: " + node);
+      this.rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
       return;
     }
     
-    application.containerLaunchedOnNode(containerId);
+    application.containerLaunchedOnNode(containerId, node.getNodeID());
   }
 
   @Override

+ 7 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -87,6 +88,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
 
 @LimitedPrivate("yarn")
 @Evolving
+@SuppressWarnings("unchecked")
 public class FifoScheduler implements ResourceScheduler {
 
   private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
@@ -282,7 +284,6 @@ public class FifoScheduler implements ResourceScheduler {
     return nodes.get(nodeId);
   }
   
-  @SuppressWarnings("unchecked")
   private synchronized void addApplication(ApplicationAttemptId appAttemptId,
       String user) {
     // TODO: Fix store
@@ -655,10 +656,14 @@ public class FifoScheduler implements ResourceScheduler {
       LOG.info("Unknown application: " + applicationAttemptId + 
           " launched container " + containerId +
           " on node: " + node);
+      // Some unknown container sneaked into the system. Kill it.
+      this.rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+
       return;
     }
     
-    application.containerLaunchedOnNode(containerId);
+    application.containerLaunchedOnNode(containerId, node.getNodeID());
   }
 
   @Lock(FifoScheduler.class)

+ 7 - 9
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java

@@ -39,15 +39,17 @@ public class MockNM {
 
   private int responseId;
   private NodeId nodeId;
-  private final String nodeIdStr;
   private final int memory;
   private final ResourceTrackerService resourceTracker;
   private final int httpPort = 2;
 
   MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
-    this.nodeIdStr = nodeIdStr;
     this.memory = memory;
     this.resourceTracker = resourceTracker;
+    String[] splits = nodeIdStr.split(":");
+    nodeId = Records.newRecord(NodeId.class);
+    nodeId.setHost(splits[0]);
+    nodeId.setPort(Integer.parseInt(splits[1]));
   }
 
   public NodeId getNodeId() {
@@ -63,14 +65,10 @@ public class MockNM {
         new HashMap<ApplicationId, List<ContainerStatus>>();
     conts.put(container.getId().getApplicationAttemptId().getApplicationId(), 
         Arrays.asList(new ContainerStatus[] { container.getContainerStatus() }));
-    nodeHeartbeat(conts, true,nodeId);
+    nodeHeartbeat(conts, true);
   }
 
   public NodeId registerNode() throws Exception {
-    String[] splits = nodeIdStr.split(":");
-    nodeId = Records.newRecord(NodeId.class);
-    nodeId.setHost(splits[0]);
-    nodeId.setPort(Integer.parseInt(splits[1]));
     RegisterNodeManagerRequest req = Records.newRecord(
         RegisterNodeManagerRequest.class);
     req.setNodeId(nodeId);
@@ -83,11 +81,11 @@ public class MockNM {
   }
 
   public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
-    return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b,nodeId);
+    return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b);
   }
 
   public HeartbeatResponse nodeHeartbeat(Map<ApplicationId, 
-      List<ContainerStatus>> conts, boolean isHealthy, NodeId nodeId) throws Exception {
+      List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
     NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
     NodeStatus status = Records.newRecord(NodeStatus.class);
     status.setNodeId(nodeId);

+ 2 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -152,13 +152,13 @@ public class MockNodes {
       }
 
       @Override
-      public List<ApplicationId> pullAppsToCleanup() {
+      public List<ApplicationId> getAppsToCleanup() {
         // TODO Auto-generated method stub
         return null;
       }
 
       @Override
-      public List<ContainerId> pullContainersToCleanUp() {
+      public List<ContainerId> getContainersToCleanUp() {
         // TODO Auto-generated method stub
         return null;
       }

+ 145 - 5
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java

@@ -19,26 +19,39 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import junit.framework.Assert;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.Test;
-import org.mortbay.log.Log;
 
 public class TestApplicationCleanup {
 
+  private static final Log LOG = LogFactory
+    .getLog(TestApplicationCleanup.class);
+
   @Test
   public void testAppCleanup() throws Exception {
     Logger rootLogger = LogManager.getRootLogger();
@@ -67,11 +80,13 @@ public class TestApplicationCleanup {
     List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>()).getAllocatedContainers();
     int contReceived = conts.size();
-    while (contReceived < request) {
+    int waitCount = 0;
+    while (contReceived < request && waitCount++ < 20) {
       conts = am.allocate(new ArrayList<ResourceRequest>(),
           new ArrayList<ContainerId>()).getAllocatedContainers();
       contReceived += conts.size();
-      Log.info("Got " + contReceived + " containers. Waiting to get " + request);
+      LOG.info("Got " + contReceived + " containers. Waiting to get "
+               + request);
       Thread.sleep(2000);
     }
     Assert.assertEquals(request, conts.size());
@@ -86,11 +101,12 @@ public class TestApplicationCleanup {
     
     //currently only containers are cleaned via this
     //AM container is cleaned via container launcher
-    while (cleanedConts < 2 || cleanedApps < 1) {
+    waitCount = 0;
+    while ((cleanedConts < 3 || cleanedApps < 1) && waitCount++ < 20) {
       HeartbeatResponse resp = nm1.nodeHeartbeat(true);
       contsToClean = resp.getContainersToCleanupList();
       apps = resp.getApplicationsToCleanupList();
-      Log.info("Waiting to get cleanup events.. cleanedConts: "
+      LOG.info("Waiting to get cleanup events.. cleanedConts: "
           + cleanedConts + " cleanedApps: " + cleanedApps);
       cleanedConts += contsToClean.size();
       cleanedApps += apps.size();
@@ -99,6 +115,130 @@ public class TestApplicationCleanup {
     
     Assert.assertEquals(1, apps.size());
     Assert.assertEquals(app.getApplicationId(), apps.get(0));
+    Assert.assertEquals(1, cleanedApps);
+    Assert.assertEquals(3, cleanedConts);
+
+    rm.stop();
+  }
+
+  @Test
+  public void testContainerCleanup() throws Exception {
+
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm = new MockRM() {
+      @Override
+      protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
+        return new SchedulerEventDispatcher(this.scheduler) {
+          @Override
+          public void handle(SchedulerEvent event) {
+            scheduler.handle(event);
+          }
+        };
+      }
+
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("h1:1234", 5000);
+
+    RMApp app = rm.submitApp(2000);
+
+    //kick the scheduling
+    nm1.nodeHeartbeat(true);
+
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+    
+    //request for containers
+    int request = 2;
+    am.allocate("h1" , 1000, request, 
+        new ArrayList<ContainerId>());
+    dispatcher.await();
+    
+    //kick the scheduler
+    nm1.nodeHeartbeat(true);
+    List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+    int contReceived = conts.size();
+    int waitCount = 0;
+    while (contReceived < request && waitCount++ < 20) {
+      conts = am.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>()).getAllocatedContainers();
+      dispatcher.await();
+      contReceived += conts.size();
+      LOG.info("Got " + contReceived + " containers. Waiting to get "
+               + request);
+      Thread.sleep(2000);
+    }
+    Assert.assertEquals(request, conts.size());
+
+    // Release a container.
+    ArrayList<ContainerId> release = new ArrayList<ContainerId>();
+    release.add(conts.get(1).getId());
+    am.allocate(new ArrayList<ResourceRequest>(), release);
+    dispatcher.await();
+
+    // Send one more heartbeat with a fake running container. This is to
+    // simulate the situation that can happen if the NM reports that container
+    // is running in the same heartbeat when the RM asks it to clean it up.
+    Map<ApplicationId, List<ContainerStatus>> containerStatuses =
+        new HashMap<ApplicationId, List<ContainerStatus>>();
+    ArrayList<ContainerStatus> containerStatusList =
+        new ArrayList<ContainerStatus>();
+    containerStatusList.add(BuilderUtils.newContainerStatus(conts.get(1)
+      .getId(), ContainerState.RUNNING, "nothing", 0));
+    containerStatuses.put(app.getApplicationId(), containerStatusList);
+
+    HeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
+    dispatcher.await();
+    List<ContainerId> contsToClean = resp.getContainersToCleanupList();
+    int cleanedConts = contsToClean.size();
+    waitCount = 0;
+    while (cleanedConts < 1 && waitCount++ < 20) {
+      resp = nm1.nodeHeartbeat(true);
+      dispatcher.await();
+      contsToClean = resp.getContainersToCleanupList();
+      LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
+      cleanedConts += contsToClean.size();
+      Thread.sleep(1000);
+    }
+    LOG.info("Got cleanup for " + contsToClean.get(0));
+    Assert.assertEquals(1, cleanedConts);
+
+    // Now to test the case when RM already gave cleanup, and NM suddenly
+    // realizes that the container is running.
+    LOG.info("Testing container launch much after release and "
+        + "NM getting cleanup");
+    containerStatuses.clear();
+    containerStatusList.clear();
+    containerStatusList.add(BuilderUtils.newContainerStatus(conts.get(1)
+      .getId(), ContainerState.RUNNING, "nothing", 0));
+    containerStatuses.put(app.getApplicationId(), containerStatusList);
+
+    resp = nm1.nodeHeartbeat(containerStatuses, true);
+    dispatcher.await();
+    contsToClean = resp.getContainersToCleanupList();
+    cleanedConts = contsToClean.size();
+    // The cleanup list won't be instantaneous as it is given out by scheduler
+    // and not RMNodeImpl.
+    waitCount = 0;
+    while (cleanedConts < 1 && waitCount++ < 20) {
+      resp = nm1.nodeHeartbeat(true);
+      dispatcher.await();
+      contsToClean = resp.getContainersToCleanupList();
+      LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
+      cleanedConts += contsToClean.size();
+      Thread.sleep(1000);
+    }
+    LOG.info("Got cleanup for " + contsToClean.get(0));
+    Assert.assertEquals(1, cleanedConts);
 
     rm.stop();
   }

+ 1 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -164,8 +164,7 @@ public class TestResourceTrackerService {
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
 
     nodeHeartbeat = nm2.nodeHeartbeat(
-        new HashMap<ApplicationId, List<ContainerStatus>>(), true,
-        recordFactory.newRecordInstance(NodeId.class));
+      new HashMap<ApplicationId, List<ContainerStatus>>(), true);
     Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction()));
     checkRebootedNMCount(rm, ++initialMetricCount);
   }