Explorar o código

Moving RmContainer into the scheduler.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1153445 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli %!s(int64=14) %!d(string=hai) anos
pai
achega
263d01e146
Modificáronse 24 ficheiros con 476 adicións e 486 borrados
  1. 5 14
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  2. 5 11
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
  3. 0 4
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
  4. 0 8
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  5. 0 28
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  6. 2 1
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java
  7. 0 4
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
  8. 4 65
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
  9. 5 3
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java
  10. 7 12
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
  11. 0 12
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
  12. 2 71
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  13. 28 0
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeReport.java
  14. 90 14
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
  15. 16 66
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
  16. 28 0
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java
  17. 52 4
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
  18. 36 34
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  19. 17 13
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  20. 24 0
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java
  21. 8 1
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerFinishedSchedulerEvent.java
  22. 2 3
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
  23. 143 117
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  24. 2 1
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java

+ 5 - 14
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -56,8 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.AbstractService;
 
 
@@ -217,26 +216,18 @@ public class ApplicationMasterService extends AbstractService implements
       List<Container> release = request.getReleaseList();
       List<Container> release = request.getReleaseList();
 
 
       // Send new requests to appAttempt.
       // Send new requests to appAttempt.
-      if (!ask.isEmpty()) {        this.rScheduler.allocate(appAttemptId, ask);
-      }
-
-      // Send events to the containers being released.
-      for (Container releasedContainer : release) {
-        this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMContainerEvent(releasedContainer.getId(),
-                RMContainerEventType.RELEASED));
-      }
+      Allocation allocation = 
+          this.rScheduler.allocate(appAttemptId, ask, release);
 
 
       RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
       RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
       RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
       RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
 
 
       AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
       AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
-      response.addAllNewContainers(appAttempt.pullNewlyAllocatedContainers());
+      response.addAllNewContainers(allocation.getContainers());
       response.addAllFinishedContainers(appAttempt
       response.addAllFinishedContainers(appAttempt
           .pullJustFinishedContainers());
           .pullJustFinishedContainers());
       response.setResponseId(lastResponse.getResponseId() + 1);
       response.setResponseId(lastResponse.getResponseId() + 1);
-      response.setAvailableResources(rScheduler
-          .getResourceLimit(appAttemptId));
+      response.setAvailableResources(allocation.getResourceLimit());
       responseMap.put(appAttemptId, response);
       responseMap.put(appAttemptId, response);
       allocateResponse.setAMResponse(response);
       allocateResponse.setAMResponse(response);
       return allocateResponse;
       return allocateResponse;

+ 5 - 11
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -354,21 +354,15 @@ public class ClientRMService extends AbstractService implements
   }
   }
 
 
   private NodeReport createNodeReports(RMNode rmNode) {
   private NodeReport createNodeReports(RMNode rmNode) {
-    NodeReport report = 
-      recordFactory.newRecordInstance(NodeReport.class);
+    NodeReport report = recordFactory.newRecordInstance(NodeReport.class);
     report.setNodeId(rmNode.getNodeID());
     report.setNodeId(rmNode.getNodeID());
     report.setRackName(rmNode.getRackName());
     report.setRackName(rmNode.getRackName());
     report.setCapability(rmNode.getTotalCapability());
     report.setCapability(rmNode.getTotalCapability());
     report.setNodeHealthStatus(rmNode.getNodeHealthStatus());
     report.setNodeHealthStatus(rmNode.getNodeHealthStatus());
-    List<Container> containers = rmNode.getRunningContainers();
-    int userdResource = 0;
-    for (Container c : containers) {
-      userdResource += c.getResource().getMemory();
-    }
-    Resource usedRsrc = recordFactory.newRecordInstance(Resource.class);
-    usedRsrc.setMemory(userdResource);
-    report.setUsed(usedRsrc);
-    report.setNumContainers(rmNode.getNumContainers());
+    org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport schedulerNodeReport = scheduler
+        .getNodeReport(rmNode.getNodeID());
+    report.setUsed(schedulerNodeReport.getUsedResources());
+    report.setNumContainers(schedulerNodeReport.getNumContainers());
     return report;
     return report;
   }
   }
 
 

+ 0 - 4
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java

@@ -3,7 +3,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentMap;
 
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore;
@@ -11,7 +10,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.NodeStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 
 public interface RMContext {
 public interface RMContext {
@@ -24,8 +22,6 @@ public interface RMContext {
 
 
   ConcurrentMap<ApplicationId, RMApp> getRMApps();
   ConcurrentMap<ApplicationId, RMApp> getRMApps();
 
 
-  ConcurrentMap<ContainerId, RMContainer> getRMContainers();
-
   ConcurrentMap<NodeId, RMNode> getRMNodes();
   ConcurrentMap<NodeId, RMNode> getRMNodes();
 
 
   AMLivelinessMonitor getAMLivelinessMonitor();
   AMLivelinessMonitor getAMLivelinessMonitor();

+ 0 - 8
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -24,9 +24,6 @@ public class RMContextImpl implements RMContext {
   private final ConcurrentMap<ApplicationId, RMApp> applications
   private final ConcurrentMap<ApplicationId, RMApp> applications
     = new ConcurrentHashMap<ApplicationId, RMApp>();
     = new ConcurrentHashMap<ApplicationId, RMApp>();
 
 
-  private final ConcurrentMap<ContainerId, RMContainer> containers
-    = new ConcurrentHashMap<ContainerId, RMContainer>();
-
   private final ConcurrentMap<NodeId, RMNode> nodes
   private final ConcurrentMap<NodeId, RMNode> nodes
     = new ConcurrentHashMap<NodeId, RMNode>();
     = new ConcurrentHashMap<NodeId, RMNode>();
 
 
@@ -62,11 +59,6 @@ public class RMContextImpl implements RMContext {
     return this.applications;
     return this.applications;
   }
   }
 
 
-  @Override
-  public ConcurrentMap<ContainerId, RMContainer> getRMContainers() {
-    return this.containers;
-  }
-
   @Override
   @Override
   public ConcurrentMap<NodeId, RMNode> getRMNodes() {
   public ConcurrentMap<NodeId, RMNode> getRMNodes() {
     return this.nodes;
     return this.nodes;

+ 0 - 28
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -150,10 +150,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
     this.rmDispatcher.register(RMNodeEventType.class,
     this.rmDispatcher.register(RMNodeEventType.class,
         new NodeEventDispatcher(this.rmContext));
         new NodeEventDispatcher(this.rmContext));
 
 
-    // Register event handler for RMContainer
-    this.rmDispatcher.register(RMContainerEventType.class,
-        new ContainerEventDispatcher(this.rmContext));
-
     //TODO change this to be random
     //TODO change this to be random
     this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
     this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
         .createSecretKey("Dummy".getBytes()));
         .createSecretKey("Dummy".getBytes()));
@@ -294,30 +290,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
     }
     }
   }
   }
 
 
-  private static final class ContainerEventDispatcher implements
-      EventHandler<RMContainerEvent> {
-
-    private final RMContext rmContext;
-
-    public ContainerEventDispatcher(RMContext rmContext) {
-      this.rmContext = rmContext;
-    }
-
-    @Override
-    public void handle(RMContainerEvent event) {
-      ContainerId containerId = event.getContainerId();
-      RMContainer container = this.rmContext.getRMContainers().get(containerId);
-      if (container != null) {
-        try {
-          ((EventHandler<RMContainerEvent>) container).handle(event);
-        } catch (Throwable t) {
-          LOG.error("Error in handling event type " + event.getType()
-              + " for container " + containerId, t);
-        }
-      }
-    }
-  }
-
   protected void startWepApp() {
   protected void startWepApp() {
     webApp = WebApps.$for("yarn", masterService).at(
     webApp = WebApps.$for("yarn", masterService).at(
         conf.get(YarnConfiguration.RM_WEBAPP_BIND_ADDRESS,
         conf.get(YarnConfiguration.RM_WEBAPP_BIND_ADDRESS,

+ 2 - 1
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java

@@ -114,7 +114,8 @@ public class ZKStore implements Store {
     node.setCapability(rmNode.getTotalCapability());
     node.setCapability(rmNode.getTotalCapability());
     // TODO: FIXME
     // TODO: FIXME
 //    node.setUsed(nodeInfo.getUsedResource());
 //    node.setUsed(nodeInfo.getUsedResource());
-    node.setNumContainers(rmNode.getNumContainers());
+    // TODO: acm: refactor2 FIXME
+//  node.setNumContainers(rmNode.getNumContainers());
     return (NodeReportPBImpl)node;
     return (NodeReportPBImpl)node;
   }
   }
 
 

+ 0 - 4
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java

@@ -31,12 +31,8 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent>{
 
 
   List<Container> pullJustFinishedContainers();
   List<Container> pullJustFinishedContainers();
 
 
-  List<Container> pullNewlyAllocatedContainers();
-
   List<Container> getJustFinishedContainers();
   List<Container> getJustFinishedContainers();
 
 
-  List<Container> getNewlyAllocatedContainers();
-
   Container getMasterContainer();
   Container getMasterContainer();
 
 
   ApplicationSubmissionContext getSubmissionContext();
   ApplicationSubmissionContext getSubmissionContext();

+ 4 - 65
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -79,14 +79,9 @@ public class RMAppAttemptImpl implements RMAppAttempt {
   private final String clientToken;
   private final String clientToken;
   private final ApplicationSubmissionContext submissionContext;
   private final ApplicationSubmissionContext submissionContext;
 
 
-  private Map<ContainerId, ContainerId> liveContainers
-    = new HashMap<ContainerId, ContainerId>();
-
   //nodes on while this attempt's containers ran
   //nodes on while this attempt's containers ran
   private final Set<NodeId> ranNodes = 
   private final Set<NodeId> ranNodes = 
     new HashSet<NodeId>();
     new HashSet<NodeId>();
-  private final List<Container> newlyAllocatedContainers = 
-    new ArrayList<Container>();
   private final List<Container> justFinishedContainers = 
   private final List<Container> justFinishedContainers = 
     new ArrayList<Container>();
     new ArrayList<Container>();
   private Container masterContainer;
   private Container masterContainer;
@@ -162,8 +157,7 @@ public class RMAppAttemptImpl implements RMAppAttempt {
       .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
       .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
           RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
           RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
       .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
       .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
-          RMAppAttemptEventType.CONTAINER_ALLOCATED,
-          new NormalContainerAllocatedTransition())
+          RMAppAttemptEventType.CONTAINER_ALLOCATED)
       .addTransition(
       .addTransition(
           RMAppAttemptState.RUNNING,
           RMAppAttemptState.RUNNING,
           EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FAILED),
           EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FAILED),
@@ -332,16 +326,6 @@ public class RMAppAttemptImpl implements RMAppAttempt {
     }
     }
   }
   }
 
 
-  @Override
-  public List<Container> getNewlyAllocatedContainers() {
-    this.readLock.lock();
-    try {
-      return this.newlyAllocatedContainers;
-    } finally {
-      this.readLock.unlock();
-    }
-  }
-
   @Override
   @Override
   public List<Container> pullJustFinishedContainers() {
   public List<Container> pullJustFinishedContainers() {
     this.writeLock.lock();
     this.writeLock.lock();
@@ -362,27 +346,6 @@ public class RMAppAttemptImpl implements RMAppAttempt {
     return ranNodes;
     return ranNodes;
   }
   }
 
 
-  @Override
-  public List<Container> pullNewlyAllocatedContainers() {
-    this.writeLock.lock();
-
-    try {
-      List<Container> returnList = new ArrayList<Container>(
-          this.newlyAllocatedContainers.size());
-      returnList.addAll(this.newlyAllocatedContainers);
-      for (Container cont : newlyAllocatedContainers) {
-        ranNodes.add(cont.getNodeId());//add to the nodes set when these containers
-        //are pulled by AM
-        eventHandler.handle(
-            new RMContainerEvent(cont.getId(), RMContainerEventType.ACQUIRED));
-      }
-      this.newlyAllocatedContainers.clear();
-      return returnList;
-    } finally {
-      this.writeLock.unlock();
-    }
-  }
-
   @Override
   @Override
   public Container getMasterContainer() {
   public Container getMasterContainer() {
     this.readLock.lock();
     this.readLock.lock();
@@ -461,6 +424,8 @@ public class RMAppAttemptImpl implements RMAppAttempt {
     }
     }
   }
   }
 
 
+  private static final List<Container> EMPTY_CONTAINER_LIST = 
+      new ArrayList<Container>();
   private static final class ScheduleTransition extends BaseTransition {
   private static final class ScheduleTransition extends BaseTransition {
     @Override
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
     public void transition(RMAppAttemptImpl appAttempt,
@@ -479,7 +444,7 @@ public class RMAppAttemptImpl implements RMAppAttempt {
           + appAttempt.applicationAttemptId + " required " + request);
           + appAttempt.applicationAttemptId + " required " + request);
 
 
       appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
       appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
-          Collections.singletonList(request));
+          Collections.singletonList(request), EMPTY_CONTAINER_LIST);
     }
     }
   }
   }
 
 
@@ -657,29 +622,6 @@ public class RMAppAttemptImpl implements RMAppAttempt {
       // Tell the launcher to cleanup.
       // Tell the launcher to cleanup.
       appAttempt.eventHandler.handle(new AMLauncherEvent(
       appAttempt.eventHandler.handle(new AMLauncherEvent(
           AMLauncherEventType.CLEANUP, appAttempt));
           AMLauncherEventType.CLEANUP, appAttempt));
-
-      // Kill all the allocated containers.
-      for (ContainerId containerid : appAttempt.liveContainers.keySet()) {
-        appAttempt.eventHandler.handle(new RMContainerEvent(containerid,
-            RMContainerEventType.KILL));
-      }
-    }
-  }
-
-  private static final class NormalContainerAllocatedTransition extends
-      BaseTransition {
-    @Override
-    public void transition(RMAppAttemptImpl appAttempt,
-        RMAppAttemptEvent event) {
-      RMAppAttemptContainerAllocatedEvent allocatedEvent
-          = (RMAppAttemptContainerAllocatedEvent) event;
-
-      // Store the container for pulling
-      Container container = allocatedEvent.getContainer();
-      appAttempt.newlyAllocatedContainers.add(container);
-
-      // Add it to allContainers list.
-      appAttempt.liveContainers.put(container.getId(), container.getId());
     }
     }
   }
   }
 
 
@@ -745,9 +687,6 @@ public class RMAppAttemptImpl implements RMAppAttempt {
 
 
       // Normal container.
       // Normal container.
 
 
-      // Remove it from allContainers list.
-      appAttempt.liveContainers.remove(container.getId());
-
       // Put it in completedcontainers list
       // Put it in completedcontainers list
       appAttempt.justFinishedContainers.add(container);
       appAttempt.justFinishedContainers.add(container);
       return RMAppAttemptState.RUNNING;
       return RMAppAttemptState.RUNNING;

+ 5 - 3
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java

@@ -20,14 +20,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
 import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
 import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
 
 
 public class ContainerAllocationExpirer extends
 public class ContainerAllocationExpirer extends
-    AbstractLivelinessMonitor<ContainerId> {
+    AbstractLivelinessMonitor<Container> {
 
 
   private EventHandler dispatcher;
   private EventHandler dispatcher;
 
 
@@ -46,7 +48,7 @@ public class ContainerAllocationExpirer extends
   }
   }
 
 
   @Override
   @Override
-  protected void expire(ContainerId id) {
-    dispatcher.handle(new RMContainerEvent(id, RMContainerEventType.EXPIRE));
+  protected void expire(Container container) {
+    dispatcher.handle(new ContainerExpiredSchedulerEvent(container));
   }
   }
 }
 }

+ 7 - 12
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java

@@ -15,7 +15,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerFinishedSchedulerEvent;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachine;
@@ -91,12 +90,12 @@ public class RMContainerImpl implements RMContainer {
   private final EventHandler eventHandler;
   private final EventHandler eventHandler;
   private final ContainerAllocationExpirer containerAllocationExpirer;
   private final ContainerAllocationExpirer containerAllocationExpirer;
 
 
-  public RMContainerImpl(ContainerId containerId,
-      ApplicationAttemptId appAttemptId, NodeId nodeId, Container container,
+  public RMContainerImpl(Container container,
+      ApplicationAttemptId appAttemptId, NodeId nodeId,
       EventHandler handler,
       EventHandler handler,
       ContainerAllocationExpirer containerAllocationExpirer) {
       ContainerAllocationExpirer containerAllocationExpirer) {
     this.stateMachine = stateMachineFactory.make(this);
     this.stateMachine = stateMachineFactory.make(this);
-    this.containerId = containerId;
+    this.containerId = container.getId();
     this.nodeId = nodeId;
     this.nodeId = nodeId;
     this.container = container;
     this.container = container;
     this.appAttemptId = appAttemptId;
     this.appAttemptId = appAttemptId;
@@ -182,7 +181,7 @@ public class RMContainerImpl implements RMContainer {
     @Override
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
     public void transition(RMContainerImpl container, RMContainerEvent event) {
       // Register with containerAllocationExpirer.
       // Register with containerAllocationExpirer.
-      container.containerAllocationExpirer.register(container.containerId);
+      container.containerAllocationExpirer.register(container.getContainer());
     }
     }
   }
   }
 
 
@@ -191,7 +190,7 @@ public class RMContainerImpl implements RMContainer {
     @Override
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
     public void transition(RMContainerImpl container, RMContainerEvent event) {
       // Unregister from containerAllocationExpirer.
       // Unregister from containerAllocationExpirer.
-      container.containerAllocationExpirer.unregister(container.containerId);
+      container.containerAllocationExpirer.unregister(container.getContainer());
     }
     }
   }
   }
 
 
@@ -211,10 +210,6 @@ public class RMContainerImpl implements RMContainer {
       // Inform AppAttempt
       // Inform AppAttempt
       container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
       container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
           container.appAttemptId, container.container));
           container.appAttemptId, container.container));
-
-      // Inform Scheduler
-      container.eventHandler.handle(new ContainerFinishedSchedulerEvent(
-          container.container));
     }
     }
   }
   }
 
 
@@ -224,7 +219,7 @@ public class RMContainerImpl implements RMContainer {
     public void transition(RMContainerImpl container, RMContainerEvent event) {
     public void transition(RMContainerImpl container, RMContainerEvent event) {
 
 
       // Unregister from containerAllocationExpirer.
       // Unregister from containerAllocationExpirer.
-      container.containerAllocationExpirer.unregister(container.containerId);
+      container.containerAllocationExpirer.unregister(container.getContainer());
 
 
       // Inform AppAttempt, scheduler etc.
       // Inform AppAttempt, scheduler etc.
       super.transition(container, event);
       super.transition(container, event);
@@ -237,7 +232,7 @@ public class RMContainerImpl implements RMContainer {
     public void transition(RMContainerImpl container, RMContainerEvent event) {
     public void transition(RMContainerImpl container, RMContainerEvent event) {
 
 
       // Unregister from containerAllocationExpirer.
       // Unregister from containerAllocationExpirer.
-      container.containerAllocationExpirer.unregister(container.containerId);
+      container.containerAllocationExpirer.unregister(container.getContainer());
 
 
       // Inform node
       // Inform node
       container.eventHandler.handle(new RMNodeCleanContainerEvent(
       container.eventHandler.handle(new RMNodeCleanContainerEvent(

+ 0 - 12
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

@@ -99,20 +99,8 @@ public interface RMNode {
    */
    */
   public Node getNode();
   public Node getNode();
   
   
-  /**
-   * The current number of containers for this node
-   * @return the number of containers
-   */
-  public int getNumContainers();
-
   public RMNodeState getState();
   public RMNodeState getState();
 
 
-  /**
-   * Get running containers on this node.
-   * @return running containers
-   */
-  public List<Container> getRunningContainers();
-
   public List<ContainerId> pullContainersToCleanUp();
   public List<ContainerId> pullContainersToCleanUp();
 
 
   public List<ApplicationId> pullAppsToCleanup();
   public List<ApplicationId> pullAppsToCleanup();

+ 2 - 71
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -21,9 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -35,9 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -46,9 +42,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -88,10 +81,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   private final NodeHealthStatus nodeHealthStatus = recordFactory
   private final NodeHealthStatus nodeHealthStatus = recordFactory
       .newRecordInstance(NodeHealthStatus.class);
       .newRecordInstance(NodeHealthStatus.class);
   
   
-  /* set of containers that are allocated containers */
-  private final Map<ContainerId, Container> launchedContainers = 
-    new TreeMap<ContainerId, Container>();
-  
   /* set of containers that need to be cleaned */
   /* set of containers that need to be cleaned */
   private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
   private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
       new ContainerIdComparator());
       new ContainerIdComparator());
@@ -208,30 +197,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     return this.node;
     return this.node;
   }
   }
   
   
-  @Override
-  public List<Container> getRunningContainers() {
-    this.readLock.lock();
-
-    try {
-      List<Container> containers = new ArrayList<Container>();
-      containers.addAll(this.launchedContainers.values());
-      return containers;
-    } finally {
-      this.readLock.unlock();
-    }
-  }
-
-  @Override
-  public int getNumContainers() {
-    this.readLock.lock();
-
-    try {
-      return this.launchedContainers.size();
-    } finally {
-      this.readLock.unlock();
-    }
-  }
-
   @Override
   @Override
   public NodeHealthStatus getNodeHealthStatus() {
   public NodeHealthStatus getNodeHealthStatus() {
     this.readLock.lock();
     this.readLock.lock();
@@ -319,13 +284,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
     }
   }
   }
   
   
-  private void killAllContainers() {
-    for (ContainerId contId : launchedContainers.keySet()) {
-      context.getDispatcher().getEventHandler().handle(
-          new RMContainerEvent(contId, RMContainerEventType.KILL));
-    }
-  }
-
   public static class CleanUpAppTransition
   public static class CleanUpAppTransition
     implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
     implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
 
@@ -352,7 +310,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
 
     @Override
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
-      rmNode.killAllContainers();
+      // Inform the scheduler
       rmNode.context.getDispatcher().getEventHandler().handle(
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodeRemovedSchedulerEvent(rmNode));
           new NodeRemovedSchedulerEvent(rmNode));
 
 
@@ -374,39 +332,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       rmNode.latestHeartBeatResponse = statusEvent.getLatestResponse();
       rmNode.latestHeartBeatResponse = statusEvent.getLatestResponse();
 
 
       if (!statusEvent.getNodeHealthStatus().getIsNodeHealthy()) {
       if (!statusEvent.getNodeHealthStatus().getIsNodeHealthy()) {
-        rmNode.killAllContainers();
+        // Inform the scheduler
         rmNode.context.getDispatcher().getEventHandler().handle(
         rmNode.context.getDispatcher().getEventHandler().handle(
             new NodeRemovedSchedulerEvent(rmNode));
             new NodeRemovedSchedulerEvent(rmNode));
         return RMNodeState.UNHEALTHY;
         return RMNodeState.UNHEALTHY;
       }
       }
 
 
-      Map<ApplicationId, List<Container>> remoteAppContainersMap = statusEvent
-          .getContainersCollection();
-      for (List<Container> remoteContainerList : remoteAppContainersMap
-          .values()) {
-        for (Container remoteContainer : remoteContainerList) {
-
-          // Process running containers
-          ContainerId containerId = remoteContainer.getId();
-          if (remoteContainer.getState() == ContainerState.RUNNING
-              || remoteContainer.getState() == ContainerState.INITIALIZING) {
-            if (!rmNode.launchedContainers.containsKey(containerId)) {
-              // Just launched container. RM knows about it the first time.
-              rmNode.launchedContainers.put(containerId, remoteContainer);
-              rmNode.context.getDispatcher().getEventHandler().handle(
-                  new RMContainerEvent(containerId,
-                      RMContainerEventType.LAUNCHED));
-            }
-          } else {
-            // A finished container
-            // Send event to the finished container
-            rmNode.context.getDispatcher().getEventHandler().handle(
-                new RMContainerFinishedEvent(containerId, remoteContainer
-                    .getContainerStatus()));
-          }
-        }
-      }
-
       rmNode.context.getDispatcher().getEventHandler().handle(
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodeUpdateSchedulerEvent(rmNode, statusEvent
           new NodeUpdateSchedulerEvent(rmNode, statusEvent
               .getContainersCollection()));
               .getContainersCollection()));

+ 28 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeReport.java

@@ -0,0 +1,28 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * Node usage report.
+ */
+@Private
+@Stable
+public class NodeReport {
+  private final Resource usedResources;
+  private final int numContainers;
+  
+  public NodeReport(Resource used, int numContainers) {
+    this.usedResources = used;
+    this.numContainers = numContainers;
+  }
+
+  public Resource getUsedResources() {
+    return usedResources;
+  }
+
+  public int getNumContainers() {
+    return numContainers;
+  }
+}

+ 90 - 14
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java

@@ -1,6 +1,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
@@ -9,6 +11,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -16,6 +19,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
 
 
 public class SchedulerApp {
 public class SchedulerApp {
 
 
@@ -32,6 +39,11 @@ public class SchedulerApp {
   private Resource resourceLimit = recordFactory
   private Resource resourceLimit = recordFactory
       .newRecordInstance(Resource.class);
       .newRecordInstance(Resource.class);
 
 
+  private Map<ContainerId, RMContainer> liveContainers
+  = new HashMap<ContainerId, RMContainer>();
+  private List<Container> newlyAllocatedContainers = 
+      new ArrayList<Container>();
+
   public SchedulerApp(AppSchedulingInfo application, Queue queue) {
   public SchedulerApp(AppSchedulingInfo application, Queue queue) {
     this.appSchedulingInfo = application;
     this.appSchedulingInfo = application;
     this.queue = queue;
     this.queue = queue;
@@ -62,10 +74,15 @@ public class SchedulerApp {
     return this.appSchedulingInfo.getNewContainerId();
     return this.appSchedulingInfo.getNewContainerId();
   }
   }
   
   
+  @Deprecated
   public List<Container> getCurrentContainers() {
   public List<Container> getCurrentContainers() {
     return this.appSchedulingInfo.getCurrentContainers();
     return this.appSchedulingInfo.getCurrentContainers();
   }
   }
 
 
+  public synchronized Collection<RMContainer> getLiveContainers() {
+    return liveContainers.values();
+  }
+  
   public Collection<Priority> getPriorities() {
   public Collection<Priority> getPriorities() {
     return this.appSchedulingInfo.getPriorities();
     return this.appSchedulingInfo.getPriorities();
   }
   }
@@ -90,30 +107,85 @@ public class SchedulerApp {
     return this.queue;
     return this.queue;
   }
   }
 
 
-  public void stop(RMAppAttemptState rmAppAttemptFinalState) {
+  public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
+    // Kill all 'live' containers
+    for (RMContainer container : getLiveContainers()) {
+      completedContainer(container.getContainer(), 
+          RMContainerEventType.KILL); 
+    }
+    
+    // Cleanup all scheduling information
     this.appSchedulingInfo.stop(rmAppAttemptFinalState);
     this.appSchedulingInfo.stop(rmAppAttemptFinalState);
   }
   }
 
 
-  synchronized public void completedContainer(Container container, 
-      Resource containerResource) {
-    if (container != null) {
-      LOG.info("Completed container: " + container);
+  synchronized public void launchContainer(ContainerId containerId) {
+    // Inform the container
+    RMContainer rmContainer = 
+        getRMContainer(containerId);
+    rmContainer.handle(
+        new RMContainerEvent(containerId, 
+            RMContainerEventType.LAUNCHED));
+  }
+
+  public synchronized void killContainers(
+      SchedulerApp application) {
+  }
+
+  synchronized public void completedContainer(Container cont,
+      RMContainerEventType event) {
+    ContainerId containerId = cont.getId();
+    // Inform the container
+    RMContainer container = getRMContainer(containerId);
+    if (event.equals(RMContainerEventType.FINISHED)) {
+      // Have to send diagnostics for finished containers.
+      container.handle(new RMContainerFinishedEvent(containerId,
+          cont.getContainerStatus()));
+    } else {
+      container.handle(new RMContainerEvent(containerId, event));
     }
     }
-    queue.getMetrics().releaseResources(getUser(), 1,
-        containerResource);
+    LOG.info("Completed container: " + container + 
+        " in state: " + container.getState());
+    
+    // Remove from the list of containers
+    liveContainers.remove(container.getContainer());
+    
+    // Update usage metrics 
+    Resource containerResource = container.getContainer().getResource();
+    queue.getMetrics().releaseResources(getUser(), 1, containerResource);
     Resources.subtractFrom(currentConsumption, containerResource);
     Resources.subtractFrom(currentConsumption, containerResource);
   }
   }
 
 
   synchronized public void allocate(NodeType type, SchedulerNode node,
   synchronized public void allocate(NodeType type, SchedulerNode node,
-      Priority priority, ResourceRequest request, List<Container> containers) {
+      Priority priority, ResourceRequest request, 
+      List<RMContainer> containers) {
     // Update consumption and track allocations
     // Update consumption and track allocations
-    for (Container container : containers) {
-      Resources.addTo(currentConsumption, container.getResource());
-      LOG.debug("allocate: applicationId=" + container.getId().getAppId()
-          + " container=" + container.getId() + " host="
-          + container.getNodeId().toString());
+    List<Container> allocatedContainers = 
+        new ArrayList<Container>();
+    for (RMContainer container : containers) {
+      Container c = container.getContainer();
+      // Inform the container
+      container.handle(
+          new RMContainerEvent(c.getId(), RMContainerEventType.START));
+      allocatedContainers.add(c);
+      
+      Resources.addTo(currentConsumption, c.getResource());
+      LOG.debug("allocate: applicationId=" + c.getId().getAppId()
+          + " container=" + c.getId() + " host="
+          + c.getNodeId().toString());
+      
+      // Add it to allContainers list.
+      newlyAllocatedContainers.add(c);
+      liveContainers.put(c.getId(), container);
     }
     }
-    appSchedulingInfo.allocate(type, node, priority, request, containers);
+    
+    appSchedulingInfo.allocate(type, node, priority, 
+        request, allocatedContainers);
+  }
+  
+  synchronized public List<Container> pullNewlyAllocatedContainers() {
+    List<Container> allocatedContainers = newlyAllocatedContainers;
+    newlyAllocatedContainers = new ArrayList<Container>();
+    return allocatedContainers;
   }
   }
 
 
   public Resource getCurrentConsumption() {
   public Resource getCurrentConsumption() {
@@ -155,4 +227,8 @@ public class SchedulerApp {
     
     
     return limit;
     return limit;
   }
   }
+
+  public synchronized RMContainer getRMContainer(ContainerId id) {
+    return liveContainers.get(id);
+  }
 }
 }

+ 16 - 66
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -1,6 +1,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.TreeMap;
@@ -31,9 +32,9 @@ public class SchedulerNode {
   private volatile int numContainers;
   private volatile int numContainers;
 
 
   /* set of containers that are allocated containers */
   /* set of containers that are allocated containers */
-  private final Map<ContainerId, Container> runningContainers = 
+  private final Map<ContainerId, Container> launchedContainers = 
     new TreeMap<ContainerId, Container>();
     new TreeMap<ContainerId, Container>();
-
+  
   private final RMNode rmNode;
   private final RMNode rmNode;
 
 
   public static final String ANY = "*";
   public static final String ANY = "*";
@@ -71,70 +72,11 @@ public class SchedulerNode {
    * @param containers allocated containers
    * @param containers allocated containers
    */
    */
   public synchronized void allocateContainer(ApplicationId applicationId, 
   public synchronized void allocateContainer(ApplicationId applicationId, 
-      List<Container> containers) {
-    if (containers == null) {
-      LOG.error("Adding null containers for application " + applicationId);
-      return;
-    }   
-    for (Container container : containers) {
-      allocateContainer(container);
-    }
-
-    LOG.info("addContainers:" +
-        " node=" + rmNode.getNodeAddress() + 
-        " #containers=" + containers.size() + 
-        " available=" + getAvailableResource().getMemory() + 
-        " used=" + getUsedResource().getMemory());
-  }
-
-  /**
-   * Status update from the NodeManager
-   * @param nodeStatus node status
-   * @return the set of containers no longer should be used by the
-   * node manager.
-   */
-  public synchronized void 
-    statusUpdate(Map<String,List<Container>> allContainers) {
-
-    if (allContainers == null) {
-      return;
-    }
-       
-    List<Container> listContainers = new ArrayList<Container>();
-    // Iterate through the running containers and update their status
-    for (Map.Entry<String, List<Container>> e : 
-      allContainers.entrySet()) {
-      listContainers.addAll(e.getValue());
-    }
-    update(listContainers);
-  }
-  
-  /**
-   * Status update for an application running on a given node
-   * @param node node
-   * @param containers containers update.
-   * @return containers that are completed or need to be preempted.
-   */
-  private synchronized void update(List<Container> containers) {
-    
-    for (Container container : containers) {
-    
-      if (container.getState() == ContainerState.COMPLETE) {
-        if (runningContainers.remove(container.getId()) != null) {
-          updateResource(container);
-          LOG.info("Completed container " + container);
-        }
-        LOG.info("Removed completed container " + container.getId() + " on node " + 
-            rmNode.getNodeAddress());
-      }
-    }
-  }
-  
-  private synchronized void allocateContainer(Container container) {
+      Container container) {
     deductAvailableResource(container.getResource());
     deductAvailableResource(container.getResource());
     ++numContainers;
     ++numContainers;
     
     
-    runningContainers.put(container.getId(), container);
+    launchedContainers.put(container.getId(), container);
     LOG.info("Allocated container " + container.getId() + 
     LOG.info("Allocated container " + container.getId() + 
         " to node " + rmNode.getNodeAddress());
         " to node " + rmNode.getNodeAddress());
     
     
@@ -154,7 +96,7 @@ public class SchedulerNode {
   }
   }
 
 
   private synchronized boolean isValidContainer(Container c) {    
   private synchronized boolean isValidContainer(Container c) {    
-    if (runningContainers.containsKey(c.getId()))
+    if (launchedContainers.containsKey(c.getId()))
       return true;
       return true;
     return false;
     return false;
   }
   }
@@ -178,7 +120,7 @@ public class SchedulerNode {
     
     
     /* remove the containers from the nodemanger */
     /* remove the containers from the nodemanger */
     
     
-    runningContainers.remove(container.getId());
+    launchedContainers.remove(container.getId());
     updateResource(container);
     updateResource(container);
 
 
     LOG.info("Released container " + container.getId() + 
     LOG.info("Released container " + container.getId() + 
@@ -211,8 +153,16 @@ public class SchedulerNode {
 
 
   @Override
   @Override
   public String toString() {
   public String toString() {
-    return "host: " + rmNode.getNodeAddress() + " #containers=" + rmNode.getNumContainers() +  
+    return "host: " + rmNode.getNodeAddress() + " #containers=" + getNumContainers() +  
       " available=" + getAvailableResource().getMemory() + 
       " available=" + getAvailableResource().getMemory() + 
       " used=" + getUsedResource().getMemory();
       " used=" + getUsedResource().getMemory();
   }
   }
+
+  public int getNumContainers() {
+    return numContainers;
+  }
+
+  public synchronized List<Container> getRunningContainers() {
+    return new ArrayList<Container>(launchedContainers.values());
+  }
 }
 }

+ 28 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java

@@ -0,0 +1,28 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * Node usage report.
+ */
+@Private
+@Stable
+public class SchedulerNodeReport {
+  private final Resource usedResources;
+  private final int numContainers;
+  
+  public SchedulerNodeReport(Resource used, int numContainers) {
+    this.usedResources = used;
+    this.numContainers = numContainers;
+  }
+
+  public Resource getUsedResources() {
+    return usedResources;
+  }
+
+  public int getNumContainers() {
+    return numContainers;
+  }
+}

+ 52 - 4
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java

@@ -21,7 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.List;
 import java.util.List;
 
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@@ -45,6 +49,8 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
    * @return queue information
    * @return queue information
    * @throws IOException
    * @throws IOException
    */
    */
+  @Public
+  @Stable
   public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
   public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
       boolean recursive) throws IOException;
       boolean recursive) throws IOException;
 
 
@@ -53,26 +59,68 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
    * @return acls for queues for current user
    * @return acls for queues for current user
    * @throws IOException
    * @throws IOException
    */
    */
+  @Public
+  @Stable
   public List<QueueUserACLInfo> getQueueUserAclInfo();
   public List<QueueUserACLInfo> getQueueUserAclInfo();
   
   
   /**
   /**
    * Get minimum allocatable {@link Resource}.
    * Get minimum allocatable {@link Resource}.
    * @return minimum allocatable resource
    * @return minimum allocatable resource
    */
    */
+  @Public
+  @Stable
   public Resource getMinimumResourceCapability();
   public Resource getMinimumResourceCapability();
   
   
   /**
   /**
    * Get maximum allocatable {@link Resource}.
    * Get maximum allocatable {@link Resource}.
    * @return maximum allocatable resource
    * @return maximum allocatable resource
    */
    */
+  @Public
+  @Stable
   public Resource getMaximumResourceCapability();
   public Resource getMaximumResourceCapability();
 
 
-  public Resource getResourceLimit(ApplicationAttemptId appAttemptId);
-
-  void allocate(ApplicationAttemptId appAttemptId, List<ResourceRequest> ask);
+  /**
+   * The main api between the ApplicationMaster and the Scheduler.
+   * The ApplicationMaster is updating his future resource requirements
+   * and may release containers he doens't need.
+   * 
+   * @param appAttemptId
+   * @param ask
+   * @param release
+   * @return the {@link Allocation} for the application
+   */
+  @Public
+  @Stable
+  Allocation 
+  allocate(ApplicationAttemptId appAttemptId, 
+      List<ResourceRequest> ask,
+      List<Container> release);
 
 
+  /**
+   * Get node resource usage report.
+   * @param nodeId
+   * @return the {@link SchedulerNodeReport} for the node
+   */
+  @Private
+  @Stable
+  public SchedulerNodeReport getNodeReport(NodeId nodeId);
+  
+  /**
+   * Get used resources on the node
+   * @param nodeId node
+   * @return used resources on the node
+   */
+  @Private
+  @Stable
   Resource getUsedResource(NodeId nodeId);
   Resource getUsedResource(NodeId nodeId);
 
 
+  /**
+   * Get available resources on the node
+   * @param nodeId node
+   * @return available resources on the node
+   */
+  @Private
+  @Stable
   Resource getAvailableResource(NodeId nodeId);
   Resource getAvailableResource(NodeId nodeId);
-
+  
 }
 }

+ 36 - 34
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -52,27 +52,25 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.ApplicationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerFinishedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
-import org.apache.hadoop.yarn.util.BuilderUtils;
 
 
 @LimitedPrivate("yarn")
 @LimitedPrivate("yarn")
 @Evolving
 @Evolving
@@ -380,36 +378,46 @@ implements ResourceScheduler, CapacitySchedulerContext {
     applications.remove(applicationAttemptId);
     applications.remove(applicationAttemptId);
   }
   }
 
 
+  private static final Allocation EMPTY_ALLOCATION = 
+      new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
+
   @Override
   @Override
   @Lock(Lock.NoLock.class)
   @Lock(Lock.NoLock.class)
-  public void allocate(ApplicationAttemptId applicationAttemptId,
-      List<ResourceRequest> ask) {
+  public Allocation allocate(ApplicationAttemptId applicationAttemptId,
+      List<ResourceRequest> ask, List<Container> release) {
 
 
     CSApp application = getApplication(applicationAttemptId);
     CSApp application = getApplication(applicationAttemptId);
     if (application == null) {
     if (application == null) {
       LOG.info("Calling allocate on removed " +
       LOG.info("Calling allocate on removed " +
           "or non existant application " + applicationAttemptId);
           "or non existant application " + applicationAttemptId);
-      return;
+      return EMPTY_ALLOCATION;
     }
     }
     
     
     // Sanity check
     // Sanity check
     normalizeRequests(ask);
     normalizeRequests(ask);
 
 
-    LOG.info("DEBUG --- allocate: pre-update" +
-        " applicationId=" + applicationAttemptId + 
-        " application=" + application);
-    application.showRequests();
+    synchronized (application) {
 
 
-    // Update application requests
-    application.updateResourceRequests(ask);
+      LOG.info("DEBUG --- allocate: pre-update" +
+          " applicationId=" + applicationAttemptId + 
+          " application=" + application);
+      application.showRequests();
 
 
-    LOG.info("DEBUG --- allocate: post-update");
-    application.showRequests();
-    
-    LOG.info("DEBUG --- allocate:" +
-        " applicationId=" + applicationAttemptId + 
-        " #ask=" + ask.size());
-   }
+      // Update application requests
+      application.updateResourceRequests(ask);
+
+      LOG.info("DEBUG --- allocate: post-update");
+      application.showRequests();
+
+      LOG.info("DEBUG --- allocate:" +
+          " applicationId=" + applicationAttemptId + 
+          " #ask=" + ask.size());
+      
+      return new Allocation(
+          application.pullNewlyAllocatedContainers(), 
+          application.getHeadroom());
+    }
+  }
 
 
   @Override
   @Override
   @Lock(Lock.NoLock.class)
   @Lock(Lock.NoLock.class)
@@ -480,7 +488,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
       Map<String,List<Container>> containers ) {
       Map<String,List<Container>> containers ) {
     LOG.info("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
     LOG.info("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
     SchedulerNode node = this.csNodes.get(nm.getNodeID());
     SchedulerNode node = this.csNodes.get(nm.getNodeID());
-    node.statusUpdate(containers);
+    //TODO node.statusUpdate(containers);
 
 
     // Completed containers
     // Completed containers
     processCompletedContainers(getCompletedContainers(containers));
     processCompletedContainers(getCompletedContainers(containers));
@@ -593,11 +601,6 @@ implements ResourceScheduler, CapacitySchedulerContext {
     return applications.get(applicationAttemptId);
     return applications.get(applicationAttemptId);
   }
   }
 
 
-  @Override
-  public Resource getResourceLimit(ApplicationAttemptId applicationAttemptId) {
-    return applications.get(applicationAttemptId).getHeadroom();
-  }
-
   @Override
   @Override
   public synchronized void handle(SchedulerEvent event) {
   public synchronized void handle(SchedulerEvent event) {
     switch(event.getType()) {
     switch(event.getType()) {
@@ -628,13 +631,6 @@ implements ResourceScheduler, CapacitySchedulerContext {
       doneApplication(appRemovedEvent.getApplicationAttemptID(),
       doneApplication(appRemovedEvent.getApplicationAttemptID(),
           appRemovedEvent.getFinalAttemptState());
           appRemovedEvent.getFinalAttemptState());
       break;
       break;
-    case CONTAINER_FINISHED:
-      ContainerFinishedSchedulerEvent containerFinishedEvent = (ContainerFinishedSchedulerEvent) event;
-      Container container = containerFinishedEvent.getContainer();
-      this.rmContext.getRMContainers().remove(container.getId());
-      processSingleCompletedContainer(container);
-      releaseContainer(container.getId().getAppId(), container);
-      break;
     default:
     default:
       LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
       LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
     }
     }
@@ -654,7 +650,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
     --numNodeManagers;
     --numNodeManagers;
 
 
     // Remove running containers
     // Remove running containers
-    List<Container> runningContainers = nodeInfo.getRunningContainers();
+    List<Container> runningContainers = null;//TODO = nodeInfo.getRunningContainers();
     killRunningContainers(runningContainers);
     killRunningContainers(runningContainers);
     
     
     // Remove reservations, if any
     // Remove reservations, if any
@@ -693,4 +689,10 @@ implements ResourceScheduler, CapacitySchedulerContext {
 //      }
 //      }
 //    }
 //    }
   }
   }
+
+  @Override
+  public SchedulerNodeReport getNodeReport(NodeId nodeId) {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }
 }

+ 17 - 13
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -919,28 +919,31 @@ public class LeafQueue implements Queue {
       Priority priority, ResourceRequest request, 
       Priority priority, ResourceRequest request, 
       CSNode node, List<Container> containers) {
       CSNode node, List<Container> containers) {
     // Allocate container to the application
     // Allocate container to the application
-    application.allocate(type, node, priority, request, containers);
+    // TODO: acm: refactor2 FIXME
+    application.allocate(type, node, priority, request, null);
 
 
     for (Container container : containers) {
     for (Container container : containers) {
       // Create the container and 'start' it.
       // Create the container and 'start' it.
       ContainerId containerId = container.getId();
       ContainerId containerId = container.getId();
       RMContext rmContext = this.scheduler.getRMContext();
       RMContext rmContext = this.scheduler.getRMContext();
       EventHandler eventHandler = rmContext.getDispatcher().getEventHandler();
       EventHandler eventHandler = rmContext.getDispatcher().getEventHandler();
-      RMContainer rmContainer = new RMContainerImpl(containerId, application
-          .getApplicationAttemptId(), node.getNodeID(), container,
+      RMContainer rmContainer = new RMContainerImpl(container, application
+          .getApplicationAttemptId(), node.getNodeID(),
           eventHandler, rmContext.getContainerAllocationExpirer());
           eventHandler, rmContext.getContainerAllocationExpirer());
-      if (rmContext.getRMContainers().putIfAbsent(containerId, rmContainer) != null) {
-        LOG.error("Duplicate container addition! ContainerID :  "
-            + containerId);
-      } else {
-        eventHandler.handle(new RMContainerEvent(containerId,
-            RMContainerEventType.START));
-      }
+      // TODO: FIX
+//      if (rmContext.getRMContainers().putIfAbsent(containerId, rmContainer) != null) {
+//        LOG.error("Duplicate container addition! ContainerID :  "
+//            + containerId);
+//      } else {
+//        eventHandler.handle(new RMContainerEvent(containerId,
+//            RMContainerEventType.START));
+//      }
     }
     }
 
 
     // Inform the NodeManager about the allocation
     // Inform the NodeManager about the allocation
-    node.allocateContainer(application.getApplicationId(),
-        containers);
+    // TODO: acm: refactor2 FIXME
+//    node.allocateContainer(application.getApplicationId(),
+//        containers);
   }
   }
 
 
   private void reserve(CSApp application, Priority priority, 
   private void reserve(CSApp application, Priority priority, 
@@ -968,7 +971,8 @@ public class LeafQueue implements Queue {
         
         
         // Inform the application - this might be an allocated container or
         // Inform the application - this might be an allocated container or
         // an unfulfilled reservation
         // an unfulfilled reservation
-        application.completedContainer(container, containerResource);
+        // TODO: acm: refactor2 FIXME
+        //application.completedContainer(container, containerResource);
         
         
         // Book-keeping
         // Book-keeping
         releaseResource(clusterResource, 
         releaseResource(clusterResource, 

+ 24 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerExpiredSchedulerEvent.java

@@ -0,0 +1,24 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+
+/**
+ * The {@link SchedulerEvent} which notifies that a {@link Container}
+ * has expired, sent by {@link ContainerAllocationExpirer} 
+ *
+ */
+public class ContainerExpiredSchedulerEvent extends SchedulerEvent {
+
+  private final Container container;
+  
+  public ContainerExpiredSchedulerEvent(Container container) {
+    super(SchedulerEventType.CONTAINER_EXPIRED);
+    this.container = container;
+  }
+
+  public Container getContainer() {
+    return container;
+  }
+
+}

+ 8 - 1
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerFinishedSchedulerEvent.java

@@ -1,18 +1,25 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
 
 
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 
 
 public class ContainerFinishedSchedulerEvent extends SchedulerEvent {
 public class ContainerFinishedSchedulerEvent extends SchedulerEvent {
 
 
   private final Container container;
   private final Container container;
+  private final RMContainerEventType cause;
 
 
-  public ContainerFinishedSchedulerEvent(Container container) {
+  public ContainerFinishedSchedulerEvent(Container container, RMContainerEventType cause) {
     super(SchedulerEventType.CONTAINER_FINISHED);
     super(SchedulerEventType.CONTAINER_FINISHED);
     this.container = container;
     this.container = container;
+    this.cause = cause;
   }
   }
 
 
   public Container getContainer() {
   public Container getContainer() {
     return container;
     return container;
   }
   }
 
 
+  public RMContainerEventType getCause() {
+    return cause;
+  }
+
 }
 }

+ 2 - 3
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java

@@ -6,12 +6,11 @@ public enum SchedulerEventType {
   NODE_ADDED,
   NODE_ADDED,
   NODE_REMOVED,
   NODE_REMOVED,
   NODE_UPDATE,
   NODE_UPDATE,
-
-  // Source: Container
-  CONTAINER_FINISHED,
   
   
   // Source: App
   // Source: App
   APP_ADDED,
   APP_ADDED,
   APP_REMOVED,
   APP_REMOVED,
 
 
+  // Source: ContainerAllocationExpirer
+  CONTAINER_EXPIRED
 }
 }

+ 143 - 117
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -37,10 +37,10 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.Lock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -61,10 +61,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -72,9 +72,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerFinishedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -169,11 +170,11 @@ public class FifoScheduler implements ResourceScheduler {
   };
   };
 
 
   public synchronized Resource getUsedResource(NodeId nodeId) {
   public synchronized Resource getUsedResource(NodeId nodeId) {
-    return nodes.get(nodeId).getUsedResource();
+    return getNode(nodeId).getUsedResource();
   }
   }
 
 
   public synchronized Resource getAvailableResource(NodeId nodeId) {
   public synchronized Resource getAvailableResource(NodeId nodeId) {
-    return nodes.get(nodeId).getAvailableResource();
+    return getNode(nodeId).getAvailableResource();
   }
   }
 
 
   @Override
   @Override
@@ -206,19 +207,26 @@ public class FifoScheduler implements ResourceScheduler {
     }
     }
   }
   }
 
 
+  private static final Allocation EMPTY_ALLOCATION = 
+      new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
   @Override
   @Override
-  public synchronized void allocate(ApplicationAttemptId applicationAttemptId,
-      List<ResourceRequest> ask) {
+  public Allocation allocate(ApplicationAttemptId applicationAttemptId,
+      List<ResourceRequest> ask, List<Container> release) {
     SchedulerApp application = getApplication(applicationAttemptId);
     SchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
     if (application == null) {
       LOG.error("Calling allocate on removed " +
       LOG.error("Calling allocate on removed " +
           "or non existant application " + applicationAttemptId);
           "or non existant application " + applicationAttemptId);
-      return;
+      return EMPTY_ALLOCATION;
     }
     }
 
 
     // Sanity check
     // Sanity check
     normalizeRequests(ask);
     normalizeRequests(ask);
-    
+
+    // Release containers
+    for (Container releasedContainer : release) {
+      completedContainer(releasedContainer, RMContainerEventType.RELEASED);
+    }
+
     synchronized (application) {
     synchronized (application) {
 
 
       LOG.debug("allocate: pre-update" +
       LOG.debug("allocate: pre-update" +
@@ -237,16 +245,13 @@ public class FifoScheduler implements ResourceScheduler {
       LOG.debug("allocate:" +
       LOG.debug("allocate:" +
           " applicationId=" + applicationAttemptId + 
           " applicationId=" + applicationAttemptId + 
           " #ask=" + ask.size());
           " #ask=" + ask.size());
+      
+      return new Allocation(
+          application.pullNewlyAllocatedContainers(), 
+          application.getHeadroom());
     }
     }
   }
   }
 
 
-  @Override
-  public Resource getResourceLimit(ApplicationAttemptId applicationAttemptId) {
-    SchedulerApp application = getApplication(applicationAttemptId);
-    // TODO: What if null?
-    return application.getHeadroom();
-  }
-  
   private void normalizeRequests(List<ResourceRequest> asks) {
   private void normalizeRequests(List<ResourceRequest> asks) {
     for (ResourceRequest ask : asks) {
     for (ResourceRequest ask : asks) {
       normalizeRequest(ask);
       normalizeRequest(ask);
@@ -256,9 +261,9 @@ public class FifoScheduler implements ResourceScheduler {
   private void normalizeRequest(ResourceRequest ask) {
   private void normalizeRequest(ResourceRequest ask) {
     int memory = ask.getCapability().getMemory();
     int memory = ask.getCapability().getMemory();
     // FIXME: TestApplicationCleanup is relying on unnormalized behavior.
     // FIXME: TestApplicationCleanup is relying on unnormalized behavior.
-    //ask.capability.memory = MINIMUM_MEMORY *
     memory = MINIMUM_MEMORY *
     memory = MINIMUM_MEMORY *
     ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
     ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
+    ask.setCapability(Resources.createResource(memory));
   }
   }
 
 
   private synchronized SchedulerApp getApplication(
   private synchronized SchedulerApp getApplication(
@@ -266,6 +271,10 @@ public class FifoScheduler implements ResourceScheduler {
     return applications.get(applicationAttemptId);
     return applications.get(applicationAttemptId);
   }
   }
 
 
+  private synchronized SchedulerNode getNode(NodeId nodeId) {
+    return nodes.get(nodeId);
+  }
+  
   private synchronized void addApplication(ApplicationAttemptId appAttemptId,
   private synchronized void addApplication(ApplicationAttemptId appAttemptId,
       String queueName, String user) {
       String queueName, String user) {
     AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
     AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
@@ -297,7 +306,7 @@ public class FifoScheduler implements ResourceScheduler {
     // Remove the application
     // Remove the application
     applications.remove(applicationAttemptId);
     applications.remove(applicationAttemptId);
   }
   }
-
+  
   /**
   /**
    * Heart of the scheduler...
    * Heart of the scheduler...
    * 
    * 
@@ -473,15 +482,18 @@ public class FifoScheduler implements ResourceScheduler {
       Math.min(assignableContainers, availableContainers);
       Math.min(assignableContainers, availableContainers);
 
 
     if (assignedContainers > 0) {
     if (assignedContainers > 0) {
-      List<Container> containers =
-        new ArrayList<Container>(assignedContainers);
       for (int i=0; i < assignedContainers; ++i) {
       for (int i=0; i < assignedContainers; ++i) {
+        // Create the container
         Container container =
         Container container =
             BuilderUtils.newContainer(recordFactory,
             BuilderUtils.newContainer(recordFactory,
                 application.getApplicationAttemptId(),
                 application.getApplicationAttemptId(),
                 application.getNewContainerId(),
                 application.getNewContainerId(),
                 node.getRMNode().getNodeID(), node.getRMNode().getNodeAddress(),
                 node.getRMNode().getNodeID(), node.getRMNode().getNodeAddress(),
                 node.getRMNode().getHttpAddress(), capability);
                 node.getRMNode().getHttpAddress(), capability);
+        RMContainer rmContainer = 
+            new RMContainerImpl(container, 
+                application.getApplicationAttemptId(), node.getNodeID(), 
+                null, this.rmContext.getContainerAllocationExpirer());
         
         
         // If security is enabled, send the container-tokens too.
         // If security is enabled, send the container-tokens too.
         if (UserGroupInformation.isSecurityEnabled()) {
         if (UserGroupInformation.isSecurityEnabled()) {
@@ -499,100 +511,84 @@ public class FifoScheduler implements ResourceScheduler {
           containerToken.setService(container.getNodeId().toString());
           containerToken.setService(container.getNodeId().toString());
           container.setContainerToken(containerToken);
           container.setContainerToken(containerToken);
         }
         }
-        containers.add(container);
+        
+        // Allocate!
+        application.allocate(type, node, priority, request, 
+            Collections.singletonList(rmContainer));
+        node.allocateContainer(application.getApplicationId(), 
+            container);
       }
       }
-      application.allocate(type, node, priority, request, containers);
-      addAllocatedContainers(node, application.getApplicationAttemptId(),
-          containers);
+      
+      // Update total usage
       Resources.addTo(usedResource,
       Resources.addTo(usedResource,
           Resources.multiply(capability, assignedContainers));
           Resources.multiply(capability, assignedContainers));
     }
     }
+    
     return assignedContainers;
     return assignedContainers;
   }
   }
 
 
-  private synchronized void killContainers(List<Container> containers) {
-    applicationCompletedContainers(containers);
-  }
-
-  private synchronized void applicationCompletedContainers(List<Container> containers) {
-    for (Container c : containers) {
-      applicationCompletedContainer(c);
-    }
-  }
-
-  private synchronized void applicationCompletedContainer(Container c) {
-      SchedulerApp app = applications.get(c.getId().getAppAttemptId());
-      /** this is possible, since an application can be removed from scheduler but
-       * the nodemanger is just updating about a completed container.
-       */
-      if (app != null) {
-        app.completedContainer(c, c.getResource());
-      }
-  }
-
-  private List<Container> getCompletedContainers(Map<String, List<Container>> allContainers) {
-    if (allContainers == null) {
-      return new ArrayList<Container>();
-    }
-    List<Container> completedContainers = new ArrayList<Container>();
-    // Iterate through the running containers and update their status
-    for (Map.Entry<String, List<Container>> e : 
-      allContainers.entrySet()) {
-      for (Container c: e.getValue()) {
-        if (c.getState() == ContainerState.COMPLETE) {
-          completedContainers.add(c);
+  private synchronized void nodeUpdate(RMNode rmNode,
+      Map<ApplicationId, List<Container>> containers) {
+    SchedulerNode node = getNode(rmNode.getNodeID());
+    
+    // Process completed containers
+    for (List<Container> appContainers : containers.values()) {
+      for (Container container : appContainers) {
+        if (container.getContainerStatus().getState() == ContainerState.RUNNING
+            || container.getContainerStatus().getState() == ContainerState.INITIALIZING) {
+          launchContainer(container, node);
+        } else { // has to COMPLETE
+          completedContainer(container, RMContainerEventType.FINISHED);
         }
         }
       }
       }
     }
     }
-    return completedContainers;
-  }
-
-  private synchronized void nodeUpdate(RMNode rmNode,
-      Map<String, List<Container>> containers) {
-    SchedulerNode node = this.nodes.get(rmNode.getNodeID());
-    node.statusUpdate(containers);
-
-    applicationCompletedContainers(getCompletedContainers(containers));
 
 
-    LOG.info("Node heartbeat " + rmNode.getNodeID() + " resource = " + node.getAvailableResource());
+    LOG.info("Node heartbeat " + rmNode.getNodeID() + 
+        " available resource = " + node.getAvailableResource());
+    
     if (Resources.greaterThanOrEqual(node.getAvailableResource(),
     if (Resources.greaterThanOrEqual(node.getAvailableResource(),
         minimumAllocation)) {
         minimumAllocation)) {
       assignContainers(node);
       assignContainers(node);
     }
     }
+    
     metrics.setAvailableResourcesToQueue(
     metrics.setAvailableResourcesToQueue(
         Resources.subtract(clusterResource, usedResource));
         Resources.subtract(clusterResource, usedResource));
     LOG.info("Node after allocation " + rmNode.getNodeID() + " resource = "
     LOG.info("Node after allocation " + rmNode.getNodeID() + " resource = "
         + node.getAvailableResource());
         + node.getAvailableResource());
-
-    // TODO: Add the list of containers to be preempted when we support
   }  
   }  
 
 
   @Override
   @Override
   public synchronized void handle(SchedulerEvent event) {
   public synchronized void handle(SchedulerEvent event) {
     switch(event.getType()) {
     switch(event.getType()) {
     case NODE_ADDED:
     case NODE_ADDED:
+    {
       NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
       NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
       addNode(nodeAddedEvent.getAddedRMNode());
       addNode(nodeAddedEvent.getAddedRMNode());
-      break;
+    }
+    break;
     case NODE_REMOVED:
     case NODE_REMOVED:
+    {
       NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
       NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
       removeNode(nodeRemovedEvent.getRemovedRMNode());
       removeNode(nodeRemovedEvent.getRemovedRMNode());
-      break;
+    }
+    break;
     case NODE_UPDATE:
     case NODE_UPDATE:
-      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
-      Map<ApplicationId, List<Container>> contAppMapping = nodeUpdatedEvent.getContainers();
-      Map<String, List<Container>> conts = new HashMap<String, List<Container>>();
-      for (Map.Entry<ApplicationId, List<Container>> entry : contAppMapping.entrySet()) {
-        conts.put(entry.getKey().toString(), entry.getValue());
-      }
-      nodeUpdate(nodeUpdatedEvent.getRMNode(), conts);
-      break;
+    {
+      NodeUpdateSchedulerEvent nodeUpdatedEvent = 
+      (NodeUpdateSchedulerEvent)event;
+      nodeUpdate(nodeUpdatedEvent.getRMNode(), 
+          nodeUpdatedEvent.getContainers());
+    }
+    break;
     case APP_ADDED:
     case APP_ADDED:
+    {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
       addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
           .getQueue(), appAddedEvent.getUser());
           .getQueue(), appAddedEvent.getUser());
-      break;
+    }
+    break;
     case APP_REMOVED:
     case APP_REMOVED:
+    {
       AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
       AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
       try {
       try {
         doneApplication(appRemovedEvent.getApplicationAttemptID(),
         doneApplication(appRemovedEvent.getApplicationAttemptID(),
@@ -601,30 +597,82 @@ public class FifoScheduler implements ResourceScheduler {
         LOG.error("Unable to remove application "
         LOG.error("Unable to remove application "
             + appRemovedEvent.getApplicationAttemptID(), ie);
             + appRemovedEvent.getApplicationAttemptID(), ie);
       }
       }
-      break;
-    case CONTAINER_FINISHED:
-      ContainerFinishedSchedulerEvent containerFinishedEvent = (ContainerFinishedSchedulerEvent) event;
-      Container container = containerFinishedEvent.getContainer();
-      applicationCompletedContainer(container);
-      this.rmContext.getRMContainers().remove(container.getId());
-      releaseContainer(container.getId().getAppId(), container);
-      break;
+    }
+    break;
+    case CONTAINER_EXPIRED:
+    {
+      ContainerExpiredSchedulerEvent containerExpiredEvent = 
+          (ContainerExpiredSchedulerEvent) event;
+      completedContainer(containerExpiredEvent.getContainer(), 
+          RMContainerEventType.EXPIRE);
+    }
+    break;
     default:
     default:
       LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
       LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
     }
     }
   }
   }
 
 
+  private void launchContainer(Container container, SchedulerNode node) {
+    // Get the application for the finished container
+    ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
+    SchedulerApp application = getApplication(applicationAttemptId);
+    if (application == null) {
+      LOG.info("Unknown application: " + applicationAttemptId + 
+          " launched container " + container.getId() +
+          " on node: " + node);
+      return;
+    }
+    
+    application.launchContainer(container.getId());
+  }
 
 
+  @Lock(FifoScheduler.class)
+  private synchronized void completedContainer(Container container, RMContainerEventType event) {
+    // Get the application for the finished container
+    ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
+    SchedulerApp application = getApplication(applicationAttemptId);
+    
+    // Get the node on which the container was allocated
+    SchedulerNode node = getNode(container.getNodeId());
+    
+    if (application == null) {
+      LOG.info("Unknown application: " + applicationAttemptId + 
+          " released container " + container.getId() +
+          " on node: " + node + 
+          " with event: " + event);
+      return;
+    }
+    
+    // Inform the application
+    application.completedContainer(container, event);
+    
+    // Inform the node
+    node.releaseContainer(container);
+
+    LOG.info("Application " + applicationAttemptId + 
+        " released container " + container.getId() +
+        " on node: " + node + 
+        " with event: " + event);
+     
+  }
+  
   private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
   private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
   private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
 
 
   private synchronized void removeNode(RMNode nodeInfo) {
   private synchronized void removeNode(RMNode nodeInfo) {
+    SchedulerNode node = getNode(nodeInfo.getNodeID());
+    // Kill running containers
+    for(Container container : node.getRunningContainers()) {
+      completedContainer(container, RMContainerEventType.KILL);
+    }
+    
+    //Remove the node
     this.nodes.remove(nodeInfo.getNodeID());
     this.nodes.remove(nodeInfo.getNodeID());
+    
+    // Update cluster metrics
     Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
     Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
-    killContainers(nodeInfo.getRunningContainers());
   }
   }
 
 
-
   @Override
   @Override
   public QueueInfo getQueueInfo(String queueName,
   public QueueInfo getQueueInfo(String queueName,
       boolean includeChildQueues, boolean recursive) {
       boolean includeChildQueues, boolean recursive) {
@@ -641,35 +689,6 @@ public class FifoScheduler implements ResourceScheduler {
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
   }
   }
 
 
-  private synchronized void releaseContainer(ApplicationId applicationId, 
-      Container container) {
-    // Reap containers
-    LOG.info("Application " + applicationId + " released container " +
-        container.getId());
-    nodes.get(container.getNodeId()).releaseContainer(container);
-  }
-
-  private synchronized void addAllocatedContainers(SchedulerNode node,
-      ApplicationAttemptId appAttemptId, List<Container> containers) {
-    node.allocateContainer(appAttemptId.getApplicationId(), containers);
-    for (Container container : containers) {
-      // Create the container and 'start' it.
-      ContainerId containerId = container.getId();
-      RMContainer rmContainer = new RMContainerImpl(containerId,
-          appAttemptId, node.getNodeID(), container, this.rmContext
-              .getDispatcher().getEventHandler(), this.rmContext
-              .getContainerAllocationExpirer());
-      if (this.rmContext.getRMContainers().putIfAbsent(containerId,
-          rmContainer) != null) {
-        LOG.error("Duplicate container addition! ContainerID :  "
-            + containerId);
-      } else {
-        this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMContainerEvent(containerId, RMContainerEventType.START));
-      }
-    }
-  }
-
   @Override
   @Override
   public void recover(RMState state) {
   public void recover(RMState state) {
 //    for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
 //    for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
@@ -679,4 +698,11 @@ public class FifoScheduler implements ResourceScheduler {
 //      app.allocate(appInfo.getContainers());
 //      app.allocate(appInfo.getContainers());
 //    }
 //    }
   }
   }
+
+  @Override
+  public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
+    SchedulerNode node = getNode(nodeId);
+    return new SchedulerNodeReport(
+        node.getUsedResource(), node.getNumContainers());
+  }
 }
 }

+ 2 - 1
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NodesPage.java

@@ -71,7 +71,8 @@ class NodesPage extends RmView {
             td(health.getIsNodeHealthy() ? "Healthy" : "Unhealthy").
             td(health.getIsNodeHealthy() ? "Healthy" : "Unhealthy").
             td(Times.format(health.getLastHealthReportTime())).
             td(Times.format(health.getLastHealthReportTime())).
             td(String.valueOf(health.getHealthReport())).
             td(String.valueOf(health.getHealthReport())).
-            td(String.valueOf(ni.getNumContainers())).
+            // TODO: acm: refactor2 FIXME
+            //td(String.valueOf(ni.getNumContainers())).
             // TODO: FIXME Vinodkv
             // TODO: FIXME Vinodkv
 //            td(String.valueOf(ni.getUsedResource().getMemory())).
 //            td(String.valueOf(ni.getUsedResource().getMemory())).
 //            td(String.valueOf(ni.getAvailableResource().getMemory())).
 //            td(String.valueOf(ni.getAvailableResource().getMemory())).