Explorar el Código

CS is done (I hope! :) )

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1153453 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli hace 14 años
padre
commit
9afee8ea72
Se han modificado 15 ficheros con 852 adiciones y 782 borrados
  1. 9 0
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
  2. 1 0
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
  3. 48 1
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
  4. 41 0
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java
  5. 9 1
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerState.java
  6. 25 102
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  7. 199 50
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
  8. 68 7
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
  9. 0 139
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSApp.java
  10. 0 70
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSNode.java
  11. 156 159
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  12. 226 202
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  13. 19 15
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
  14. 13 11
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
  15. 38 25
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

+ 9 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java

@@ -3,6 +3,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 
 public interface RMContainer extends EventHandler<RMContainerEvent> {
@@ -14,5 +17,11 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
   RMContainerState getState();
 
   Container getContainer();
+
+  Resource getReservedResource();
+
+  NodeId getReservedNode();
   
+  Priority getReservedPriority();
+
 }

+ 1 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java

@@ -8,6 +8,7 @@ public enum RMContainerEventType {
   // Source: SchedulerApp
   ACQUIRED,
   KILL, // Also from Node on NodeRemoval
+  RESERVED,
 
   LAUNCHED,
   FINISHED,

+ 48 - 1
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java

@@ -11,6 +11,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
@@ -36,6 +38,19 @@ public class RMContainerImpl implements RMContainer {
         RMContainerEventType.START, new ContainerStartedTransition())
     .addTransition(RMContainerState.NEW, RMContainerState.KILLED,
         RMContainerEventType.KILL)
+    .addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
+        RMContainerEventType.RESERVED, new ContainerReservedTransition())
+
+    // Transitions from RESERVED state
+    .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, 
+        RMContainerEventType.RESERVED, new ContainerReservedTransition())
+    .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, 
+        RMContainerEventType.START, new ContainerStartedTransition())
+    .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, 
+        RMContainerEventType.KILL) // nothing to do
+    .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, 
+        RMContainerEventType.RELEASED) // nothing to do
+       
 
     // Transitions from ALLOCATED state
     .addTransition(RMContainerState.ALLOCATED, RMContainerState.ACQUIRED,
@@ -95,6 +110,10 @@ public class RMContainerImpl implements RMContainer {
   private final EventHandler eventHandler;
   private final ContainerAllocationExpirer containerAllocationExpirer;
 
+  private Resource reservedResource;
+  private NodeId reservedNode;
+  private Priority reservedPriority;
+
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId,
       EventHandler handler,
@@ -138,6 +157,21 @@ public class RMContainerImpl implements RMContainer {
     }
   }
 
+  @Override
+  public Resource getReservedResource() {
+    return reservedResource;
+  }
+
+  @Override
+  public NodeId getReservedNode() {
+    return reservedNode;
+  }
+
+  @Override
+  public Priority getReservedPriority() {
+    return reservedPriority;
+  }
+  
   @Override
   public void handle(RMContainerEvent event) {
     LOG.info("Processing " + event.getContainerId() + " of type " + event.getType());
@@ -171,6 +205,19 @@ public class RMContainerImpl implements RMContainer {
     }
   }
 
+  private static final class ContainerReservedTransition extends
+  BaseTransition {
+
+    @Override
+    public void transition(RMContainerImpl container, RMContainerEvent event) {
+      RMContainerReservedEvent e = (RMContainerReservedEvent)event;
+      container.reservedResource = e.getReservedResource();
+      container.reservedNode = e.getReservedNode();
+      container.reservedPriority = e.getReservedPriority();
+    }
+  }
+
+
   private static final class ContainerStartedTransition extends
       BaseTransition {
 
@@ -179,7 +226,7 @@ public class RMContainerImpl implements RMContainer {
       container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
           container.appAttemptId, container.container));
     }
-}
+  }
 
   private static final class AcquiredTransition extends BaseTransition {
 

+ 41 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java

@@ -0,0 +1,41 @@
+package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * The event signifying that a container has been reserved.
+ * 
+ * The event encapsulates information on the amount of reservation
+ * and the node on which the reservation is in effect.
+ */
+public class RMContainerReservedEvent extends RMContainerEvent {
+
+  private final Resource reservedResource;
+  private final NodeId reservedNode;
+  private final Priority reservedPriority;
+  
+  public RMContainerReservedEvent(ContainerId containerId,
+      Resource reservedResource, NodeId reservedNode, 
+      Priority reservedPriority) {
+    super(containerId, RMContainerEventType.RESERVED);
+    this.reservedResource = reservedResource;
+    this.reservedNode = reservedNode;
+    this.reservedPriority = reservedPriority;
+  }
+
+  public Resource getReservedResource() {
+    return reservedResource;
+  }
+
+  public NodeId getReservedNode() {
+    return reservedNode;
+  }
+
+  public Priority getReservedPriority() {
+    return reservedPriority;
+  }
+
+}

+ 9 - 1
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerState.java

@@ -1,5 +1,13 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
 public enum RMContainerState {
-  NEW, ALLOCATED, ACQUIRED, RUNNING, COMPLETED, EXPIRED, RELEASED, KILLED
+  NEW, 
+  RESERVED, 
+  ALLOCATED, 
+  ACQUIRED, 
+  RUNNING, 
+  COMPLETED, 
+  EXPIRED, 
+  RELEASED, 
+  KILLED
 }

+ 25 - 102
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -18,11 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,8 +32,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
-import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -74,11 +69,7 @@ public class AppSchedulingInfo {
 
   private final ApplicationStore store;
 
-  /* Current consumption */
-  List<Container> acquired = new ArrayList<Container>();
-  List<Container> completedContainers = new ArrayList<Container>();
   /* Allocated by scheduler */
-  List<Container> allocated = new ArrayList<Container>();
   boolean pending = true; // for app metrics
 
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
@@ -123,38 +114,6 @@ public class AppSchedulingInfo {
     return this.containerIdCounter.incrementAndGet();
   }
 
-  /**
-   * the currently acquired/allocated containers by the application masters.
-   * 
-   * @return the current containers being used by the application masters.
-   */
-  public synchronized List<Container> getCurrentContainers() {
-    List<Container> currentContainers = new ArrayList<Container>(acquired);
-    currentContainers.addAll(allocated);
-    return currentContainers;
-  }
-
-  /**
-   * The ApplicationMaster is acquiring the allocated/completed resources.
-   * 
-   * @return allocated resources
-   */
-  synchronized private List<Container> acquire() {
-    // Return allocated containers
-    acquired.addAll(allocated);
-    List<Container> heartbeatContainers = allocated;
-    allocated = new ArrayList<Container>();
-
-    LOG.info("acquire:" + " application=" + applicationId + " #acquired="
-        + heartbeatContainers.size());
-    heartbeatContainers = (heartbeatContainers == null) ? new ArrayList<Container>()
-        : heartbeatContainers;
-
-    heartbeatContainers.addAll(completedContainers);
-    completedContainers.clear();
-    return heartbeatContainers;
-  }
-
   /**
    * The ApplicationMaster is updating resource requirements for the
    * application, by asking for more resources and releasing resources acquired
@@ -204,24 +163,6 @@ public class AppSchedulingInfo {
     }
   }
 
-  private synchronized void releaseContainers(List<Container> release) {
-    // Release containers and update consumption
-    for (Container container : release) {
-      LOG.debug("update: " + "application=" + applicationId + " released="
-          + container);
-      // TOday in all code paths, this is taken by completedContainer called by
-      // the caller. So commenting this.
-      // Resources.subtractFrom(currentConsumption, container.getResource());
-      for (Iterator<Container> i = acquired.iterator(); i.hasNext();) {
-        Container c = i.next();
-        if (c.getId().equals(container.getId())) {
-          i.remove();
-          LOG.info("Removed acquired container: " + container.getId());
-        }
-      }
-    }
-  }
-
   synchronized public Collection<Priority> getPriorities() {
     return priorities;
   }
@@ -242,15 +183,6 @@ public class AppSchedulingInfo {
     return request.getCapability();
   }
 
-  synchronized private void completedContainer(Container container, 
-      Resource containerResource) {
-    if (container != null) {
-      LOG.info("Completed container: " + container);
-      completedContainers.add(container);
-    }
-    queue.getMetrics().releaseResources(user, 1, containerResource);
-  }
-
   /**
    * Resources have been allocated to this application by the resource
    * scheduler. Track them.
@@ -263,17 +195,17 @@ public class AppSchedulingInfo {
    *          the priority of the request.
    * @param request
    *          the request
-   * @param containers
+   * @param container
    *          the containers allocated.
    */
   synchronized public void allocate(NodeType type, SchedulerNode node,
-      Priority priority, ResourceRequest request, List<Container> containers) {
+      Priority priority, ResourceRequest request, Container container) {
     if (type == NodeType.DATA_LOCAL) {
-      allocateNodeLocal(node, priority, request, containers);
+      allocateNodeLocal(node, priority, request, container);
     } else if (type == NodeType.RACK_LOCAL) {
-      allocateRackLocal(node, priority, request, containers);
+      allocateRackLocal(node, priority, request, container);
     } else {
-      allocateOffSwitch(node, priority, request, containers);
+      allocateOffSwitch(node, priority, request, container);
     }
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
@@ -284,7 +216,7 @@ public class AppSchedulingInfo {
     }
     LOG.debug("allocate: user: " + user + ", memory: "
         + request.getCapability());
-    metrics.allocateResources(user, containers.size(), request.getCapability());
+    metrics.allocateResources(user, 1, request.getCapability());
   }
 
   /**
@@ -295,21 +227,19 @@ public class AppSchedulingInfo {
    *          resources allocated to the application
    */
   synchronized private void allocateNodeLocal(SchedulerNode node, Priority priority,
-      ResourceRequest nodeLocalRequest, List<Container> containers) {
+      ResourceRequest nodeLocalRequest, Container container) {
     // Update consumption and track allocations
-    allocate(containers);
+    allocate(container);
 
     // Update future requirements
-    nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers()
-        - containers.size());
+    nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
     if (nodeLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getNodeAddress());
     }
 
     ResourceRequest rackLocalRequest = requests.get(priority).get(
         node.getRackName());
-    rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers()
-        - containers.size());
+    rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
     if (rackLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getRackName());
     }
@@ -317,8 +247,7 @@ public class AppSchedulingInfo {
     // Do not remove ANY
     ResourceRequest offSwitchRequest = requests.get(priority).get(
         RMNode.ANY);
-    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
-        - containers.size());
+    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
   }
 
   /**
@@ -329,14 +258,13 @@ public class AppSchedulingInfo {
    *          resources allocated to the application
    */
   synchronized private void allocateRackLocal(SchedulerNode node, Priority priority,
-      ResourceRequest rackLocalRequest, List<Container> containers) {
+      ResourceRequest rackLocalRequest, Container container) {
 
     // Update consumption and track allocations
-    allocate(containers);
+    allocate(container);
 
     // Update future requirements
-    rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers()
-        - containers.size());
+    rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
     if (rackLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getRackName());
     }
@@ -344,8 +272,7 @@ public class AppSchedulingInfo {
     // Do not remove ANY
     ResourceRequest offSwitchRequest = requests.get(priority).get(
         RMNode.ANY);
-    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
-        - containers.size());
+    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
   }
 
   /**
@@ -356,33 +283,29 @@ public class AppSchedulingInfo {
    *          resources allocated to the application
    */
   synchronized private void allocateOffSwitch(SchedulerNode node, Priority priority,
-      ResourceRequest offSwitchRequest, List<Container> containers) {
+      ResourceRequest offSwitchRequest, Container container) {
 
     // Update consumption and track allocations
-    allocate(containers);
+    allocate(container);
 
     // Update future requirements
 
     // Do not remove ANY
-    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers()
-        - containers.size());
+    offSwitchRequest.setNumContainers(offSwitchRequest.getNumContainers() - 1);
   }
 
-  synchronized private void allocate(List<Container> containers) {
+  synchronized private void allocate(Container container) {
     // Update consumption and track allocations
-    for (Container container : containers) {
-
-      allocated.add(container);
-      //TODO: fixme sharad
-     /* try {
+    //TODO: fixme sharad
+    /* try {
         store.storeContainer(container);
       } catch (IOException ie) {
         // TODO fix this. we shouldnt ignore
       }*/
-      LOG.debug("allocate: applicationId=" + applicationId + " container="
-          + container.getId() + " host="
-          + container.getNodeId().toString());
-    }
+    
+    LOG.debug("allocate: applicationId=" + applicationId + " container="
+        + container.getId() + " host="
+        + container.getNodeId().toString());
   }
 
   synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {

+ 199 - 50
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java

@@ -12,17 +12,22 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 public class SchedulerApp {
 
@@ -44,7 +49,18 @@ public class SchedulerApp {
   private List<RMContainer> newlyAllocatedContainers = 
       new ArrayList<RMContainer>();
 
-  public SchedulerApp(AppSchedulingInfo application, Queue queue) {
+  final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
+      new HashMap<Priority, Map<NodeId, RMContainer>>();
+  
+  Map<Priority, Integer> schedulingOpportunities = new HashMap<Priority, Integer>();
+
+  final Resource currentReservation = recordFactory
+      .newRecordInstance(Resource.class);
+
+  private final RMContext rmContext;
+  public SchedulerApp(RMContext rmContext, 
+      AppSchedulingInfo application, Queue queue) {
+    this.rmContext = rmContext;
     this.appSchedulingInfo = application;
     this.queue = queue;
     application.setQueue(queue);
@@ -75,11 +91,6 @@ public class SchedulerApp {
     return this.appSchedulingInfo.getNewContainerId();
   }
   
-  @Deprecated
-  public List<Container> getCurrentContainers() {
-    return this.appSchedulingInfo.getCurrentContainers();
-  }
-  
   public Collection<Priority> getPriorities() {
     return this.appSchedulingInfo.getPriorities();
   }
@@ -88,6 +99,10 @@ public class SchedulerApp {
     return this.appSchedulingInfo.getResourceRequest(priority, nodeAddress);
   }
 
+  public synchronized int getTotalRequiredResources(Priority priority) {
+    return getResourceRequest(priority, RMNode.ANY).getNumContainers();
+  }
+  
   public Resource getResource(Priority priority) {
     return this.appSchedulingInfo.getResource(priority);
   }
@@ -100,10 +115,6 @@ public class SchedulerApp {
     return this.appSchedulingInfo.getQueueName();
   }
 
-  public Queue getQueue() {
-    return this.queue;
-  }
-
   public synchronized Collection<RMContainer> getLiveContainers() {
     return new ArrayList<RMContainer>(liveContainers.values());
   }
@@ -126,61 +137,66 @@ public class SchedulerApp {
       SchedulerApp application) {
   }
 
-  synchronized public void containerCompleted(Container cont,
+  synchronized public void containerCompleted(RMContainer rmContainer,
       RMContainerEventType event) {
-    ContainerId containerId = cont.getId();
+    
+    Container container = rmContainer.getContainer();
+    ContainerId containerId = container.getId();
+    
     // Inform the container
-    RMContainer container = getRMContainer(containerId);
-
-    if (container == null) {
-      LOG.error("Invalid container completed " + cont.getId());
-      return;
-    }
-
     if (event.equals(RMContainerEventType.FINISHED)) {
       // Have to send diagnostics for finished containers.
-      container.handle(new RMContainerFinishedEvent(containerId,
-          cont.getContainerStatus()));
+      rmContainer.handle(new RMContainerFinishedEvent(containerId,
+          container.getContainerStatus()));
     } else {
-      container.handle(new RMContainerEvent(containerId, event));
+      rmContainer.handle(new RMContainerEvent(containerId, event));
     }
-    LOG.info("Completed container: " + container.getContainerId() + 
-        " in state: " + container.getState());
+    LOG.info("Completed container: " + rmContainer.getContainerId() + 
+        " in state: " + rmContainer.getState());
     
     // Remove from the list of containers
-    liveContainers.remove(container.getContainerId());
+    liveContainers.remove(rmContainer.getContainerId());
     
     // Update usage metrics 
-    Resource containerResource = container.getContainer().getResource();
+    Resource containerResource = rmContainer.getContainer().getResource();
     queue.getMetrics().releaseResources(getUser(), 1, containerResource);
     Resources.subtractFrom(currentConsumption, containerResource);
   }
 
-  synchronized public void allocate(NodeType type, SchedulerNode node,
+  synchronized public RMContainer allocate(NodeType type, SchedulerNode node,
       Priority priority, ResourceRequest request, 
-      List<RMContainer> containers) {
-    // Update consumption and track allocations
-    List<Container> allocatedContainers = 
-        new ArrayList<Container>();
-    for (RMContainer container : containers) {
-      Container c = container.getContainer();
-      // Inform the container
-      container.handle(
-          new RMContainerEvent(c.getId(), RMContainerEventType.START));
-      allocatedContainers.add(c);
-      
-      Resources.addTo(currentConsumption, c.getResource());
-      LOG.debug("allocate: applicationId=" + c.getId().getAppId()
-          + " container=" + c.getId() + " host="
-          + c.getNodeId().toString());
-      
-      // Add it to allContainers list.
-      newlyAllocatedContainers.add(container);
-      liveContainers.put(c.getId(), container);
+      Container container) {
+    
+    // Required sanity check - AM can call 'allocate' to update resource 
+    // request without locking the scheduler, hence we need to check
+    if (getTotalRequiredResources(priority) <= 0) {
+      return null;
     }
     
-    appSchedulingInfo.allocate(type, node, priority, 
-        request, allocatedContainers);
+    // Create RMContainer
+    RMContainer rmContainer = new RMContainerImpl(container, this
+        .getApplicationAttemptId(), node.getNodeID(), this.rmContext
+        .getDispatcher().getEventHandler(), this.rmContext
+        .getContainerAllocationExpirer());
+
+    // Update consumption and track allocations
+    
+    // Inform the container
+    rmContainer.handle(
+        new RMContainerEvent(container.getId(), RMContainerEventType.START));
+
+    Resources.addTo(currentConsumption, container.getResource());
+    LOG.debug("allocate: applicationId=" + container.getId().getAppId()
+        + " container=" + container.getId() + " host="
+        + container.getNodeId().toString());
+
+    // Add it to allContainers list.
+    newlyAllocatedContainers.add(rmContainer);
+    liveContainers.put(container.getId(), rmContainer);
+    
+    appSchedulingInfo.allocate(type, node, priority, request, container);
+    
+    return rmContainer;
   }
   
   synchronized public List<Container> pullNewlyAllocatedContainers() {
@@ -214,18 +230,150 @@ public class SchedulerApp {
         }
       }
     }
+    // TODO - Remove block
+    for (Priority priority : getPriorities()) {
+      Map<String, ResourceRequest> requests = getResourceRequests(priority);
+      if (requests != null) {
+        LOG.info("showRequests:" + " application=" + getApplicationId() + 
+            " headRoom=" + getHeadroom() + 
+            " currentConsumption=" + currentConsumption.getMemory());
+        for (ResourceRequest request : requests.values()) {
+          LOG.info("showRequests:" + " application=" + getApplicationId()
+              + " request=" + request);
+        }
+      }
+    }
+
   }
 
   public synchronized void setAvailableResourceLimit(Resource globalLimit) {
     this.resourceLimit = globalLimit; 
   }
 
+  public synchronized RMContainer getRMContainer(ContainerId id) {
+    return liveContainers.get(id);
+  }
+
+  synchronized public void resetSchedulingOpportunities(Priority priority) {
+    Integer schedulingOpportunities = this.schedulingOpportunities
+        .get(priority);
+    schedulingOpportunities = 0;
+    this.schedulingOpportunities.put(priority, schedulingOpportunities);
+  }
+
+  synchronized public void addSchedulingOpportunity(Priority priority) {
+    Integer schedulingOpportunities = this.schedulingOpportunities
+        .get(priority);
+    if (schedulingOpportunities == null) {
+      schedulingOpportunities = 0;
+    }
+    ++schedulingOpportunities;
+    this.schedulingOpportunities.put(priority, schedulingOpportunities);
+  }
+
+  synchronized public int getSchedulingOpportunities(Priority priority) {
+    Integer schedulingOpportunities = this.schedulingOpportunities
+        .get(priority);
+    if (schedulingOpportunities == null) {
+      schedulingOpportunities = 0;
+      this.schedulingOpportunities.put(priority, schedulingOpportunities);
+    }
+    return schedulingOpportunities;
+  }
+
+  public synchronized int getNumReservedContainers(Priority priority) {
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    return (reservedContainers == null) ? 0 : reservedContainers.size();
+  }
+
+  public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
+      RMContainer rmContainer, Container container) {
+    // Create RMContainer if necessary
+    if (rmContainer == null) {
+        rmContainer = 
+            new RMContainerImpl(container, getApplicationAttemptId(), 
+                node.getNodeID(), rmContext.getDispatcher().getEventHandler(), 
+                rmContext.getContainerAllocationExpirer());
+    }
+    rmContainer.handle(new RMContainerReservedEvent(container.getId(), 
+        container.getResource(), node.getNodeID(), priority));
+    
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    if (reservedContainers == null) {
+      reservedContainers = new HashMap<NodeId, RMContainer>();
+      this.reservedContainers.put(priority, reservedContainers);
+    }
+    reservedContainers.put(node.getNodeID(), rmContainer);
+    
+    Resources.add(currentReservation, container.getResource());
+    
+    LOG.info("Application " + getApplicationId() 
+        + " reserved container " + rmContainer
+        + " on node " + node + ", currently has " + reservedContainers.size()
+        + " at priority " + priority 
+        + "; currentReservation " + currentReservation);
+    
+    return rmContainer;
+  }
+
+  public synchronized void unreserve(SchedulerNode node, Priority priority) {
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
+    if (reservedContainers.isEmpty()) {
+      this.reservedContainers.remove(priority);
+    }
+    
+    Resource resource = reservedContainer.getContainer().getResource();
+    Resources.subtract(currentReservation, resource);
+
+    LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
+        + node + ", currently has " + reservedContainers.size() + " at priority "
+        + priority + "; currentReservation " + currentReservation);
+  }
+
+  /**
+   * Has the application reserved the given <code>node</code> at the
+   * given <code>priority</code>?
+   * @param node node to be checked
+   * @param priority priority of reserved container
+   * @return
+   */
+  public synchronized boolean isReserved(SchedulerNode node, Priority priority) {
+    Map<NodeId, RMContainer> reservedContainers = 
+        this.reservedContainers.get(priority);
+    if (reservedContainers != null) {
+      return reservedContainers.containsKey(node.getNodeID());
+    }
+    return false;
+  }
+
+  public synchronized float getLocalityWaitFactor(
+      Priority priority, int clusterNodes) {
+    // Estimate: Required unique resources (i.e. hosts + racks)
+    int requiredResources = 
+        Math.max(this.getResourceRequests(priority).size() - 1, 0);
+    return ((float) requiredResources / clusterNodes);
+  }
+
+  public synchronized List<RMContainer> getAllReservedContainers() {
+    List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
+    for (Map.Entry<Priority, Map<NodeId, RMContainer>> e : 
+      this.reservedContainers.entrySet()) {
+      reservedContainers.addAll(e.getValue().values());
+    }
+    return reservedContainers;
+  }
+  
   /**
    * Get available headroom in terms of resources for the application's user.
    * @return available resource headroom
    */
   public synchronized Resource getHeadroom() {
     Resource limit = Resources.subtract(resourceLimit, currentConsumption);
+    Resources.subtractFrom(limit, currentReservation);
 
     // Corner case to deal with applications being slightly over-limit
     if (limit.getMemory() < 0) {
@@ -235,7 +383,8 @@ public class SchedulerApp {
     return limit;
   }
 
-  public synchronized RMContainer getRMContainer(ContainerId id) {
-    return liveContainers.get(id);
+  public Queue getQueue() {
+    return queue;
   }
+
 }

+ 68 - 7
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -7,14 +7,17 @@ import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 public class SchedulerNode {
@@ -29,9 +32,11 @@ public class SchedulerNode {
 
   private volatile int numContainers;
 
+  private RMContainer reservedContainer;
+  
   /* set of containers that are allocated containers */
-  private final Map<ContainerId, Container> launchedContainers = 
-    new TreeMap<ContainerId, Container>();
+  private final Map<ContainerId, RMContainer> launchedContainers = 
+    new TreeMap<ContainerId, RMContainer>();
   
   private final RMNode rmNode;
 
@@ -70,11 +75,12 @@ public class SchedulerNode {
    * @param containers allocated containers
    */
   public synchronized void allocateContainer(ApplicationId applicationId, 
-      Container container) {
+      RMContainer rmContainer) {
+    Container container = rmContainer.getContainer();
     deductAvailableResource(container.getResource());
     ++numContainers;
     
-    launchedContainers.put(container.getId(), container);
+    launchedContainers.put(container.getId(), rmContainer);
     LOG.info("Allocated container " + container.getId() + 
         " to node " + rmNode.getNodeAddress());
     
@@ -115,7 +121,6 @@ public class SchedulerNode {
     }
 
     /* remove the containers from the nodemanger */
-    
     launchedContainers.remove(container.getId());
     updateResource(container);
 
@@ -157,7 +162,63 @@ public class SchedulerNode {
     return numContainers;
   }
 
-  public synchronized List<Container> getRunningContainers() {
-    return new ArrayList<Container>(launchedContainers.values());
+  public synchronized List<RMContainer> getRunningContainers() {
+    return new ArrayList<RMContainer>(launchedContainers.values());
+  }
+
+  public synchronized void reserveResource(
+      SchedulerApp application, Priority priority, 
+      RMContainer reservedContainer) {
+    // Check if it's already reserved
+    if (this.reservedContainer != null) {
+      // Sanity check
+      if (!reservedContainer.getContainer().getNodeId().equals(getNodeID())) {
+        throw new IllegalStateException("Trying to reserve" +
+            " container " + reservedContainer +
+            " on node " + reservedContainer.getReservedNode() + 
+            " when currently" + " reserved resource " + this.reservedContainer +
+            " on node " + this.reservedContainer.getReservedNode());
+      }
+      
+      // Cannot reserve more than one application on a given node!
+      if (!this.reservedContainer.getContainer().getId().getAppAttemptId().equals(
+          reservedContainer.getContainer().getId().getAppAttemptId())) {
+        throw new IllegalStateException("Trying to reserve" +
+        		" container " + reservedContainer + 
+            " for application " + application.getApplicationId() + 
+            " when currently" +
+            " reserved container " + this.reservedContainer +
+            " on node " + this);
+      }
+
+      LOG.info("Updated reserved container " + 
+          reservedContainer.getContainer().getId() + " on node " + 
+          this + " for application " + application);
+    } else {
+      LOG.info("Reserved container " + reservedContainer.getContainer().getId() + 
+          " on node " + this + " for application " + application);
+    }
+    this.reservedContainer = reservedContainer;
+  }
+
+  public synchronized void unreserveResource(SchedulerApp application) {
+    // Cannot unreserve for wrong application...
+    ApplicationAttemptId reservedApplication = 
+        reservedContainer.getContainer().getId().getAppAttemptId(); 
+    if (!reservedApplication.equals(
+        application.getApplicationAttemptId())) {
+      throw new IllegalStateException("Trying to unreserve " +  
+          " for application " + application.getApplicationId() + 
+          " when currently reserved " + 
+          " for application " + reservedApplication.getApplicationId() + 
+          " on node " + this);
+    }
+    
+    reservedContainer = null;
+  }
+
+  public synchronized RMContainer getReservedContainer() {
+    return reservedContainer;
   }
+
 }

+ 0 - 139
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSApp.java

@@ -1,139 +0,0 @@
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
-
-public class CSApp extends SchedulerApp {
-
-  private static final Log LOG = LogFactory.getLog(CSApp.class);
-
-  private final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
-
-  final Map<Priority, Set<CSNode>> reservedContainers = new HashMap<Priority, Set<CSNode>>();
-  Map<Priority, Integer> schedulingOpportunities = new HashMap<Priority, Integer>();
-
-  final Resource currentReservation = recordFactory
-      .newRecordInstance(Resource.class);
-
-  /* Reserved containers */
-  private final Comparator<CSNode> nodeComparator = new Comparator<CSNode>() {
-    @Override
-    public int compare(CSNode o1, CSNode o2) {
-      return o1.getNodeID().compareTo(o2.getNodeID());
-    }
-  };
-
-  public CSApp(AppSchedulingInfo appSchedulingInfo, Queue queue) {
-    super(appSchedulingInfo, queue);
-  }
-
-  synchronized public void resetSchedulingOpportunities(Priority priority) {
-    Integer schedulingOpportunities = this.schedulingOpportunities
-        .get(priority);
-    schedulingOpportunities = 0;
-    this.schedulingOpportunities.put(priority, schedulingOpportunities);
-  }
-
-  synchronized public void addSchedulingOpportunity(Priority priority) {
-    Integer schedulingOpportunities = this.schedulingOpportunities
-        .get(priority);
-    if (schedulingOpportunities == null) {
-      schedulingOpportunities = 0;
-    }
-    ++schedulingOpportunities;
-    this.schedulingOpportunities.put(priority, schedulingOpportunities);
-  }
-
-  synchronized public int getSchedulingOpportunities(Priority priority) {
-    Integer schedulingOpportunities = this.schedulingOpportunities
-        .get(priority);
-    if (schedulingOpportunities == null) {
-      schedulingOpportunities = 0;
-      this.schedulingOpportunities.put(priority, schedulingOpportunities);
-    }
-    return schedulingOpportunities;
-  }
-
-  public synchronized int getReservedContainers(Priority priority) {
-    Set<CSNode> reservedNodes = this.reservedContainers.get(priority);
-    return (reservedNodes == null) ? 0 : reservedNodes.size();
-  }
-
-  public synchronized void reserveResource(CSNode node, Priority priority,
-      Resource resource) {
-    Set<CSNode> reservedNodes = this.reservedContainers.get(priority);
-    if (reservedNodes == null) {
-      reservedNodes = new TreeSet<CSNode>(nodeComparator);
-      reservedContainers.put(priority, reservedNodes);
-    }
-    reservedNodes.add(node);
-    Resources.add(currentReservation, resource);
-    LOG.info("Application " + getApplicationId() + " reserved " + resource
-        + " on node " + node + ", currently has " + reservedNodes.size()
-        + " at priority " + priority 
-        + "; currentReservation " + currentReservation);
-    getQueue().getMetrics().reserveResource(
-        getUser(), resource);
-  }
-
-  public synchronized void unreserveResource(CSNode node, Priority priority) {
-    Set<CSNode> reservedNodes = reservedContainers.get(priority);
-    reservedNodes.remove(node);
-    if (reservedNodes.isEmpty()) {
-      this.reservedContainers.remove(priority);
-    }
-    
-    Resource resource = getResource(priority);
-    Resources.subtract(currentReservation, resource);
-
-    LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
-        + node + ", currently has " + reservedNodes.size() + " at priority "
-        + priority + "; currentReservation " + currentReservation);
-    getQueue().getMetrics().unreserveResource(
-        getUser(), node.getReservedResource());
-  }
-
-  public synchronized boolean isReserved(CSNode node, Priority priority) {
-    Set<CSNode> reservedNodes = reservedContainers.get(priority);
-    if (reservedNodes != null) {
-      return reservedNodes.contains(node);
-    }
-    return false;
-  }
-
-  public float getLocalityWaitFactor(Priority priority, int clusterNodes) {
-    // Estimate: Required unique resources (i.e. hosts + racks)
-    int requiredResources = Math.max(this.getResourceRequests(priority).size() - 1, 1);
-    return ((float) requiredResources / clusterNodes);
-  }
-
-  public Map<Priority, Set<CSNode>> getAllReservations() {
-    return new HashMap<Priority, Set<CSNode>>(reservedContainers);
-  }
-
-  public synchronized Resource getHeadroom() {
-    Resource limit = Resources.subtract(super.getHeadroom(),
-        currentReservation);
-
-    // Corner case to deal with applications being slightly over-limit
-    if (limit.getMemory() < 0) {
-      limit.setMemory(0);
-    }
-
-    return limit;
-  }
-}

+ 0 - 70
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSNode.java

@@ -1,70 +0,0 @@
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-
-public class CSNode extends SchedulerNode {
-
-  private static final Log LOG = LogFactory.getLog(CSNode.class);
-
-  private CSApp reservedApplication = null;
-  private Resource reservedResource = null;
-
-  public CSNode(RMNode node) {
-    super(node);
-  }
-
-  public synchronized void reserveResource(
-      CSApp application, Priority priority, Resource resource) {
-    // Check if it's already reserved
-    if (reservedApplication != null) {
-
-      // Cannot reserve more than one application on a given node!
-      if (!reservedApplication.getApplicationId().equals(
-          application.getApplicationId())) {
-        throw new IllegalStateException("Trying to reserve resource " + resource + 
-            " for application " + application.getApplicationId() + 
-            " when currently reserved resource " + reservedResource +
-            " for application " + reservedApplication.getApplicationId() + 
-            " on node " + this);
-      }
-
-      LOG.info("Updated reserved resource " + resource + " on node " + 
-          this + " for application " + application);
-    } else {
-      this.reservedApplication = application;
-      LOG.info("Reserved resource " + resource + " on node " + this + 
-          " for application " + application);
-    }
-    reservedResource = resource;
-  }
-
-  public synchronized void unreserveResource(CSApp application, 
-      Priority priority) {
-    // Cannot unreserve for wrong application...
-    if (!reservedApplication.getApplicationId().equals(
-        application.getApplicationId())) {
-      throw new IllegalStateException("Trying to unreserve " +  
-          " for application " + application.getApplicationId() + 
-          " when currently reserved " + 
-          " for application " + reservedApplication.getApplicationId() + 
-          " on node " + this);
-    }
-    
-    reservedApplication = null;
-    reservedResource = null;
-  }
-
-  public synchronized CSApp getReservedApplication() {
-    return reservedApplication;
-  }
-
-  public synchronized Resource getReservedResource() {
-    return reservedResource;
-  }
-
-}

+ 156 - 159
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -20,14 +20,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
@@ -38,14 +34,13 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Lock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -58,14 +53,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -97,10 +96,10 @@ implements ResourceScheduler, CapacitySchedulerContext {
     }
   };
 
-  private final Comparator<CSApp> applicationComparator = 
-    new Comparator<CSApp>() {
+  private final Comparator<SchedulerApp> applicationComparator = 
+    new Comparator<SchedulerApp>() {
     @Override
-    public int compare(CSApp a1, CSApp a2) {
+    public int compare(SchedulerApp a1, SchedulerApp a2) {
       return a1.getApplicationId().getId() - a2.getApplicationId().getId();
     }
   };
@@ -111,7 +110,8 @@ implements ResourceScheduler, CapacitySchedulerContext {
 
   private Map<String, Queue> queues = new ConcurrentHashMap<String, Queue>();
 
-  private Map<NodeId, CSNode> csNodes = new ConcurrentHashMap<NodeId, CSNode>();
+  private Map<NodeId, SchedulerNode> nodes = 
+      new ConcurrentHashMap<NodeId, SchedulerNode>();
 
   private Resource clusterResource = 
     RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
@@ -120,8 +120,8 @@ implements ResourceScheduler, CapacitySchedulerContext {
   private Resource minimumAllocation;
   private Resource maximumAllocation;
 
-  private Map<ApplicationAttemptId, CSApp> applications = Collections
-      .synchronizedMap(new TreeMap<ApplicationAttemptId, CSApp>());
+  private Map<ApplicationAttemptId, SchedulerApp> applications = 
+      new ConcurrentHashMap<ApplicationAttemptId, SchedulerApp>();
 
   private boolean initialized = false;
 
@@ -150,11 +150,11 @@ implements ResourceScheduler, CapacitySchedulerContext {
   }
 
   public synchronized Resource getUsedResource(NodeId nodeId) {
-    return csNodes.get(nodeId).getUsedResource();
+    return nodes.get(nodeId).getUsedResource();
   }
 
   public synchronized Resource getAvailableResource(NodeId nodeId) {
-    return csNodes.get(nodeId).getAvailableResource();
+    return nodes.get(nodeId).getAvailableResource();
   }
 
   public synchronized int getNumClusterNodes() {
@@ -316,19 +316,20 @@ implements ResourceScheduler, CapacitySchedulerContext {
 
     AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
         applicationAttemptId, queueName, user, null);
-    CSApp csApp = new CSApp(appSchedulingInfo, queue);
+    SchedulerApp SchedulerApp = 
+        new SchedulerApp(this.rmContext, appSchedulingInfo, queue);
 
     // Submit to the queue
     try {
-      queue.submitApplication(csApp, user, queueName);
+      queue.submitApplication(SchedulerApp, user, queueName);
     } catch (AccessControlException ace) {
       this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppAttemptRejectedEvent(applicationAttemptId, StringUtils
-              .stringifyException(ace)));
+          new RMAppAttemptRejectedEvent(applicationAttemptId, 
+              ace.toString()));
       return;
     }
 
-    applications.put(applicationAttemptId, csApp);
+    applications.put(applicationAttemptId, SchedulerApp);
 
     LOG.info("Application Submission: " + applicationAttemptId + 
         ", user: " + user +
@@ -346,7 +347,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
     LOG.info("Application " + applicationAttemptId + " is done." +
     		" finalState=" + rmAppAttemptFinalState);
     
-    CSApp application = getApplication(applicationAttemptId);
+    SchedulerApp application = getApplication(applicationAttemptId);
 
     if (application == null) {
       //      throw new IOException("Unknown application " + applicationId + 
@@ -356,10 +357,14 @@ implements ResourceScheduler, CapacitySchedulerContext {
     }
     
     // Release all the running containers 
-    processReleasedContainers(application, application.getCurrentContainers());
+    for (RMContainer rmContainer : application.getLiveContainers()) {
+      completedContainer(rmContainer, RMContainerEventType.KILL);
+    }
     
      // Release all reserved containers
-    releaseReservedContainers(application);
+    for (RMContainer rmContainer : application.getAllReservedContainers()) {
+      completedContainer(rmContainer, RMContainerEventType.KILL);
+    }
     
     // Clean up pending requests, metrics etc.
     application.stop(rmAppAttemptFinalState);
@@ -386,7 +391,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
   public Allocation allocate(ApplicationAttemptId applicationAttemptId,
       List<ResourceRequest> ask, List<Container> release) {
 
-    CSApp application = getApplication(applicationAttemptId);
+    SchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.info("Calling allocate on removed " +
           "or non existant application " + applicationAttemptId);
@@ -396,6 +401,12 @@ implements ResourceScheduler, CapacitySchedulerContext {
     // Sanity check
     normalizeRequests(ask);
 
+    // Release containers
+    for (Container releasedContainer : release) {
+      completedContainer(getRMContainer(releasedContainer), 
+          RMContainerEventType.RELEASED);
+    }
+
     synchronized (application) {
 
       LOG.info("DEBUG --- allocate: pre-update" +
@@ -465,154 +476,82 @@ implements ResourceScheduler, CapacitySchedulerContext {
         minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0)));
   }
 
-  @Lock(CapacityScheduler.class)
-  private List<Container> getCompletedContainers(
-      Map<String, List<Container>> allContainers) {
-    if (allContainers == null) {
-      return new ArrayList<Container>();
-    }
-    List<Container> completedContainers = new ArrayList<Container>();
-    // Iterate through the running containers and update their status
-    for (Map.Entry<String, List<Container>> e : 
-      allContainers.entrySet()) {
-      for (Container c: e.getValue()) {
-        if (c.getState() == ContainerState.COMPLETE) {
-          completedContainers.add(c);
-        }
-      }
-    }
-    return completedContainers;
-  }
-
   private synchronized void nodeUpdate(RMNode nm, 
       Map<String,List<Container>> containers ) {
     LOG.info("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
-    SchedulerNode node = this.csNodes.get(nm.getNodeID());
-    //TODO node.statusUpdate(containers);
-
-    // Completed containers
-    processCompletedContainers(getCompletedContainers(containers));
+    
+    SchedulerNode node = getNode(nm.getNodeID());
+    for (List<Container> appContainers : containers.values()) {
+      for (Container container : appContainers) {
+        if (container.getState() == ContainerState.RUNNING) {
+          containerLaunchedOnNode(container, node);
+        } else { // has to be 'COMPLETE'
+          LOG.info("DEBUG --- Container FINISHED: " + container.getId());
+          completedContainer(getRMContainer(container), 
+              RMContainerEventType.FINISHED);
+        }
+      }
+    }
 
-    // Assign new containers
+    // Assign new containers...
     // 1. Check for reserved applications
     // 2. Schedule if there are no reservations
 
-    CSNode csNode = this.csNodes.get(nm.getNodeID());
-
-    CSApp reservedApplication = csNode.getReservedApplication();
-    if (reservedApplication != null) {
+    RMContainer reservedContainer = node.getReservedContainer();
+    if (reservedContainer != null) {
+      SchedulerApp reservedApplication = 
+          getApplication(reservedContainer.getApplicationAttemptId());
+      
       // Try to fulfill the reservation
       LOG.info("Trying to fulfill reservation for application " + 
           reservedApplication.getApplicationId() + " on node: " + nm);
-      LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
-      Resource released = queue.assignContainers(clusterResource, csNode);
       
-      // Is the reservation necessary? If not, release the reservation
-      if (org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.greaterThan(
-          released, org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE)) {
-        queue.completedContainer(clusterResource, null, released, reservedApplication);
-      }
+      LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
+      queue.assignContainers(clusterResource, node);
     }
 
     // Try to schedule more if there are no reservations to fulfill
-    if (csNode.getReservedApplication() == null) {
-      root.assignContainers(clusterResource, csNode);
+    if (node.getReservedContainer() == null) {
+      root.assignContainers(clusterResource, node);
     } else {
       LOG.info("Skipping scheduling since node " + nm + 
           " is reserved by application " + 
-          csNode.getReservedApplication().getApplicationId());
+          node.getReservedContainer().getContainerId().getAppId());
     }
 
   }
 
-  @Lock(CapacityScheduler.class)
-  private void killRunningContainers(List<Container> containers) {
-    for (Container container : containers) {
-      container.setState(ContainerState.COMPLETE);
-      LOG.info("Killing running container " + container.getId());
-      CSApp application = applications.get(container.getId().getAppId());
-      processReleasedContainers(application, Collections.singletonList(container));
-    }
-  }
-  
-  @Lock(Lock.NoLock.class)
-  private void processCompletedContainers(
-      List<Container> completedContainers) {
-    for (Container container: completedContainers) {
-      processSingleCompletedContainer(container);
-    }
-  }
-
-  private void processSingleCompletedContainer(Container container) {
-    CSApp application = getApplication(container.getId().getAppAttemptId());
-
-    // this is possible, since an application can be removed from scheduler 
-    // but the nodemanger is just updating about a completed container.
-    if (application != null) {
-
-      // Inform the queue
-      LeafQueue queue = (LeafQueue)application.getQueue();
-      queue.completedContainer(clusterResource, container, 
-          container.getResource(), application);
-    }
-  }
-
-  @Lock(Lock.NoLock.class)
-  private synchronized void processReleasedContainers(CSApp application,
-      List<Container> releasedContainers) {
-
-    // Inform clusterTracker
-    List<Container> unusedContainers = new ArrayList<Container>();
-    for (Container container : releasedContainers) {
-      if (releaseContainer(
-          application.getApplicationId(), 
-          container)) {
-        unusedContainers.add(container);
-      }
+  private void containerLaunchedOnNode(Container container, SchedulerNode node) {
+    // Get the application for the finished container
+    ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
+    SchedulerApp application = getApplication(applicationAttemptId);
+    if (application == null) {
+      LOG.info("Unknown application: " + applicationAttemptId + 
+          " launched container " + container.getId() +
+          " on node: " + node);
+      return;
     }
-
-    // Update queue capacities
-    processCompletedContainers(unusedContainers);
-  }
-
-  @Lock(CapacityScheduler.class)
-  private void releaseReservedContainers(CSApp application) {
-    LOG.info("Releasing reservations for completed application: " + 
-        application.getApplicationId());
-    Queue queue = queues.get(application.getQueue().getQueueName());
-    Map<Priority, Set<CSNode>> reservations = application.getAllReservations();
-    for (Map.Entry<Priority, Set<CSNode>> e : reservations.entrySet()) {
-      Priority priority = e.getKey();
-      Set<CSNode> reservedNodes = new HashSet<CSNode>(e.getValue());
-      for (CSNode node : reservedNodes) {
-        Resource allocatedResource = 
-          application.getResourceRequest(priority, SchedulerNode.ANY).getCapability();
     
-        application.unreserveResource(node, priority);
-        node.unreserveResource(application, priority);
-        
-        queue.completedContainer(clusterResource, null, allocatedResource, application);
-      }
-    }
-  }
-  
-  @Lock(Lock.NoLock.class)
-  private CSApp getApplication(ApplicationAttemptId applicationAttemptId) {
-    return applications.get(applicationAttemptId);
+    application.containerLaunchedOnNode(container.getId());
   }
 
   @Override
-  public synchronized void handle(SchedulerEvent event) {
+  public void handle(SchedulerEvent event) {
     switch(event.getType()) {
     case NODE_ADDED:
+    {
       NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
       addNode(nodeAddedEvent.getAddedRMNode());
-      break;
+    }
+    break;
     case NODE_REMOVED:
+    {
       NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
       removeNode(nodeRemovedEvent.getRemovedRMNode());
-      break;
+    }
+    break;
     case NODE_UPDATE:
+    {
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
       Map<ApplicationId, List<Container>> contAppMapping = nodeUpdatedEvent.getContainers();
       Map<String, List<Container>> conts = new HashMap<String, List<Container>>();
@@ -620,24 +559,37 @@ implements ResourceScheduler, CapacitySchedulerContext {
         conts.put(entry.getKey().toString(), entry.getValue());
       }
       nodeUpdate(nodeUpdatedEvent.getRMNode(), conts);
-      break;
+    }
+    break;
     case APP_ADDED:
+    {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
       addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
           .getQueue(), appAddedEvent.getUser());
-      break;
+    }
+    break;
     case APP_REMOVED:
+    {
       AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
       doneApplication(appRemovedEvent.getApplicationAttemptID(),
           appRemovedEvent.getFinalAttemptState());
-      break;
+    }
+    break;
+    case CONTAINER_EXPIRED:
+    {
+      ContainerExpiredSchedulerEvent containerExpiredEvent = 
+          (ContainerExpiredSchedulerEvent) event;
+      completedContainer(getRMContainer(containerExpiredEvent.getContainer()), 
+          RMContainerEventType.EXPIRE);
+    }
+    break;
     default:
       LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
     }
   }
 
   private synchronized void addNode(RMNode nodeManager) {
-    this.csNodes.put(nodeManager.getNodeID(), new CSNode(nodeManager));
+    this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     ++numNodeManagers;
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
@@ -645,43 +597,86 @@ implements ResourceScheduler, CapacitySchedulerContext {
   }
 
   private synchronized void removeNode(RMNode nodeInfo) {
-    CSNode csNode = this.csNodes.remove(nodeInfo.getNodeID());
+    SchedulerNode node = this.nodes.remove(nodeInfo.getNodeID());
     Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
     --numNodeManagers;
 
     // Remove running containers
-    List<Container> runningContainers = null;//TODO = nodeInfo.getRunningContainers();
-    killRunningContainers(runningContainers);
+    List<RMContainer> runningContainers = node.getRunningContainers();
+    for (RMContainer container : runningContainers) {
+      completedContainer(container, RMContainerEventType.KILL);
+    }
     
     // Remove reservations, if any
-    CSApp reservedApplication = csNode.getReservedApplication();
-    if (reservedApplication != null) {
-      LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
-      Resource released = csNode.getReservedResource();
-      queue.completedContainer(clusterResource, null, released, reservedApplication);
+    RMContainer reservedContainer = node.getReservedContainer();
+    if (reservedContainer != null) {
+      completedContainer(reservedContainer, RMContainerEventType.KILL);
     }
     
     LOG.info("Removed node " + nodeInfo.getNodeAddress() + 
         " clusterResource: " + clusterResource);
   }
   
-  private synchronized boolean releaseContainer(ApplicationId applicationId, 
-      Container container) {
-    // Reap containers
-    LOG.info("Application " + applicationId + " released container " + container);
-    csNodes.get(container.getNodeId()).releaseContainer(container);
-    return true;
+  @Lock(CapacityScheduler.class)
+  private synchronized void completedContainer(RMContainer rmContainer, 
+      RMContainerEventType event) {
+    if (rmContainer == null) {
+      LOG.info("Null container completed...");
+      return;
+    }
+    
+    Container container = rmContainer.getContainer();
+    
+    // Get the application for the finished container
+    ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
+    SchedulerApp application = getApplication(applicationAttemptId);
+    if (application == null) {
+      LOG.info("Container " + container + " of" +
+      		" unknown application " + applicationAttemptId + 
+          " completed with event " + event);
+      return;
+    }
+    
+    // Get the node on which the container was allocated
+    SchedulerNode node = getNode(container.getNodeId());
+    
+    // Inform the queue
+    LeafQueue queue = (LeafQueue)application.getQueue();
+    queue.completedContainer(clusterResource, application, node, 
+        rmContainer, event);
+
+    LOG.info("Application " + applicationAttemptId + 
+        " released container " + container.getId() +
+        " on node: " + node + 
+        " with event: " + event);
+  }
+
+  @Lock(Lock.NoLock.class)
+  private SchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
+    return applications.get(applicationAttemptId);
+  }
+
+  @Lock(Lock.NoLock.class)
+  private SchedulerNode getNode(NodeId nodeId) {
+    return nodes.get(nodeId);
+  }
+
+  private RMContainer getRMContainer(Container container) {
+    ContainerId containerId = container.getId();
+    SchedulerApp application = 
+        getApplication(container.getId().getAppAttemptId());
+    return (application == null) ? null : application.getRMContainer(containerId);
   }
 
   @Override
   @Lock(Lock.NoLock.class)
   public void recover(RMState state) throws Exception {
-    // TODO: VINDOKVFIXME
+    // TODO: VINDOKVFIXME recovery
 //    applications.clear();
 //    for (Map.Entry<ApplicationId, ApplicationInfo> entry : state.getStoredApplications().entrySet()) {
 //      ApplicationId appId = entry.getKey();
 //      ApplicationInfo appInfo = entry.getValue();
-//      CSApp app = applications.get(appId);
+//      SchedulerApp app = applications.get(appId);
 //      app.allocate(appInfo.getContainers());
 //      for (Container c: entry.getValue().getContainers()) {
 //        Queue queue = queues.get(appInfo.getApplicationSubmissionContext().getQueue());
@@ -692,7 +687,9 @@ implements ResourceScheduler, CapacitySchedulerContext {
 
   @Override
   public SchedulerNodeReport getNodeReport(NodeId nodeId) {
-    // TODO Auto-generated method stub
-    return null;
+    SchedulerNode node = getNode(nodeId);
+    return new SchedulerNodeReport(
+        node.getUsedResource(), node.getNumContainers());
   }
+  
 }

+ 226 - 202
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -36,9 +36,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -47,20 +46,18 @@ import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 
@@ -85,8 +82,10 @@ public class LeafQueue implements Queue {
   private float usedCapacity = 0.0f;
   private volatile int numContainers;
 
-  Set<CSApp> applications;
-
+  Set<SchedulerApp> applications;
+  Map<ApplicationAttemptId, SchedulerApp> applicationsMap = 
+      new HashMap<ApplicationAttemptId, SchedulerApp>();
+  
   public final Resource minimumAllocation;
 
   private ContainerTokenSecretManager containerTokenSecretManager;
@@ -109,7 +108,7 @@ public class LeafQueue implements Queue {
   
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, Queue parent, 
-      Comparator<CSApp> applicationComparator, Queue old) {
+      Comparator<SchedulerApp> applicationComparator, Queue old) {
     this.scheduler = cs;
     this.queueName = queueName;
     this.parent = parent;
@@ -158,7 +157,7 @@ public class LeafQueue implements Queue {
         " name=" + queueName + 
         ", fullname=" + getQueuePath());
 
-    this.applications = new TreeSet<CSApp>(applicationComparator);
+    this.applications = new TreeSet<SchedulerApp>(applicationComparator);
   }
 
   private synchronized void setupQueueConfigs(
@@ -362,7 +361,7 @@ public class LeafQueue implements Queue {
   }
 
   @Override
-  public void submitApplication(CSApp application, String userName,
+  public void submitApplication(SchedulerApp application, String userName,
       String queue)  throws AccessControlException {
     // Careful! Locking order is important!
 
@@ -423,10 +422,11 @@ public class LeafQueue implements Queue {
     }
   }
 
-  private synchronized void addApplication(CSApp application, User user) {
+  private synchronized void addApplication(SchedulerApp application, User user) {
     // Accept 
     user.submitApplication();
     applications.add(application);
+    applicationsMap.put(application.getApplicationAttemptId(), application);
 
     LOG.info("Application added -" +
         " appId: " + application.getApplicationId() +
@@ -436,7 +436,7 @@ public class LeafQueue implements Queue {
   }
 
   @Override
-  public void finishApplication(CSApp application, String queue) {
+  public void finishApplication(SchedulerApp application, String queue) {
     // Careful! Locking order is important!
     synchronized (this) {
       removeApplication(application, getUser(application.getUser()));
@@ -446,8 +446,9 @@ public class LeafQueue implements Queue {
     parent.finishApplication(application, queue);
   }
 
-  public synchronized void removeApplication(CSApp application, User user) {
+  public synchronized void removeApplication(SchedulerApp application, User user) {
     applications.remove(application);
+    applicationsMap.remove(application.getApplicationAttemptId());
 
     user.finishApplication();
     if (user.getApplications() == 0) {
@@ -461,24 +462,31 @@ public class LeafQueue implements Queue {
         " #user-applications: " + user.getApplications() + 
         " #queue-applications: " + getNumApplications());
   }
+  
+  private synchronized SchedulerApp getApplication(
+      ApplicationAttemptId applicationAttemptId) {
+    return applicationsMap.get(applicationAttemptId);
+  }
 
   @Override
   public synchronized Resource 
-  assignContainers(Resource clusterResource, CSNode node) {
+  assignContainers(Resource clusterResource, SchedulerNode node) {
 
     LOG.info("DEBUG --- assignContainers:" +
         " node=" + node.getNodeAddress() + 
         " #applications=" + applications.size());
     
     // Check for reserved resources
-    CSApp reservedApplication = node.getReservedApplication();
-    if (reservedApplication != null) {
-      return assignReservedContainers(reservedApplication, node, 
+    RMContainer reservedContainer = node.getReservedContainer();
+    if (reservedContainer != null) {
+      SchedulerApp application = 
+          getApplication(reservedContainer.getApplicationAttemptId());
+      return assignReservedContainer(application, node, reservedContainer, 
           clusterResource);
     }
-
-    // Try to assign containers to applications in fifo order
-    for (CSApp application : applications) {
+    
+    // Try to assign containers to applications in order
+    for (SchedulerApp application : applications) {
       
       LOG.info("DEBUG --- pre-assignContainers for application "
           + application.getApplicationId());
@@ -497,6 +505,7 @@ public class LeafQueue implements Queue {
           }
 
           // Are we going over limits by allocating to this application?
+          
           ResourceRequest required = 
             application.getResourceRequest(priority, RMNode.ANY);
           
@@ -520,7 +529,7 @@ public class LeafQueue implements Queue {
           // Try to schedule
           Resource assigned = 
             assignContainersOnNode(clusterResource, node, application, priority, 
-                false);
+                null);
   
           // Did we schedule or reserve a container?
           if (Resources.greaterThan(assigned, Resources.none())) {
@@ -552,30 +561,21 @@ public class LeafQueue implements Queue {
 
   }
 
-  private synchronized Resource assignReservedContainers(CSApp application, 
-      CSNode node, Resource clusterResource) {
-    synchronized (application) {
-      for (Priority priority : application.getPriorities()) {
-
-        // Do we reserve containers at this 'priority'?
-        if (application.isReserved(node, priority)) {
-          
-          // Do we really need this reservation still?
-          ResourceRequest offSwitchRequest = 
-            application.getResourceRequest(priority, RMNode.ANY);
-          if (offSwitchRequest.getNumContainers() == 0) {
-            // Release
-            unreserve(application, priority, node);
-            return offSwitchRequest.getCapability();
-          }
-
-          // Try to assign if we have sufficient resources
-          assignContainersOnNode(clusterResource, node, application, priority, 
-              true);
-        }
-      }
+  private synchronized Resource assignReservedContainer(SchedulerApp application, 
+      SchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
+    // Do we still need this reservation?
+    Priority priority = rmContainer.getReservedPriority();
+    if (application.getTotalRequiredResources(priority) == 0) {
+      // Release
+      Container container = rmContainer.getContainer();
+      completedContainer(clusterResource, application, node, 
+          rmContainer, RMContainerEventType.RELEASED);
+      return container.getResource();
     }
 
+    // Try to assign if we have sufficient resources
+    assignContainersOnNode(clusterResource, node, application, priority, rmContainer);
+    
     // Doesn't matter... since it's already charged for at time of reservation
     // "re-reservation" is *free*
     return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.NONE;
@@ -599,12 +599,12 @@ public class LeafQueue implements Queue {
     return true;
   }
 
-  private void setUserResourceLimit(CSApp application, Resource resourceLimit) {
+  private void setUserResourceLimit(SchedulerApp application, Resource resourceLimit) {
     application.setAvailableResourceLimit(resourceLimit);
     metrics.setAvailableResourcesToUser(application.getUser(), resourceLimit);
   }
   
-  private Resource computeUserLimit(CSApp application, 
+  private Resource computeUserLimit(SchedulerApp application, 
       Resource clusterResource, Resource required) {
     // What is our current capacity? 
     // * It is equal to the max(required, queue-capacity) if
@@ -688,80 +688,87 @@ public class LeafQueue implements Queue {
     return (a + (b - 1)) / b;
   }
 
-  boolean needContainers(CSApp application, Priority priority) {
-    ResourceRequest offSwitchRequest = 
-      application.getResourceRequest(priority, RMNode.ANY);
-
-    int requiredContainers = offSwitchRequest.getNumContainers();
-    int reservedContainers = application.getReservedContainers(priority);
+  boolean needContainers(SchedulerApp application, Priority priority) {
+    int requiredContainers = application.getTotalRequiredResources(priority);
+    int reservedContainers = application.getNumReservedContainers(priority);
     return ((requiredContainers - reservedContainers) > 0);
   }
 
-  Resource assignContainersOnNode(Resource clusterResource, CSNode node, 
-      CSApp application, Priority priority, boolean reserved) {
+  Resource assignContainersOnNode(Resource clusterResource, SchedulerNode node, 
+      SchedulerApp application, Priority priority, RMContainer reservedContainer) {
 
     Resource assigned = Resources.none();
 
     // Data-local
-    assigned = assignNodeLocalContainers(clusterResource, node, application, priority); 
+    assigned = 
+        assignNodeLocalContainers(clusterResource, node, application, priority,
+            reservedContainer); 
     if (Resources.greaterThan(assigned, Resources.none())) {
       return assigned;
     }
 
     // Rack-local
-    assigned = assignRackLocalContainers(clusterResource, node, application, priority);
+    assigned = 
+        assignRackLocalContainers(clusterResource, node, application, priority, 
+            reservedContainer);
     if (Resources.greaterThan(assigned, Resources.none())) {
-    return assigned;
+      return assigned;
     }
     
     // Off-switch
     return assignOffSwitchContainers(clusterResource, node, application, 
-        priority, reserved);
+        priority, reservedContainer);
   }
 
-  Resource assignNodeLocalContainers(Resource clusterResource, CSNode node, 
-      CSApp application, Priority priority) {
+  Resource assignNodeLocalContainers(Resource clusterResource, SchedulerNode node, 
+      SchedulerApp application, Priority priority, 
+      RMContainer reservedContainer) {
     ResourceRequest request = application.getResourceRequest(priority, node
         .getNodeAddress());
     if (request != null) {
-      if (canAssign(application, priority, node, NodeType.DATA_LOCAL, false)) {
+      if (canAssign(application, priority, node, NodeType.DATA_LOCAL, 
+          reservedContainer)) {
         return assignContainer(clusterResource, node, application, priority, request, 
-            NodeType.DATA_LOCAL);
+            NodeType.DATA_LOCAL, reservedContainer);
       }
     }
     
     return Resources.none();
   }
 
-  Resource assignRackLocalContainers(Resource clusterResource, CSNode node, 
-      CSApp application, Priority priority) {
+  Resource assignRackLocalContainers(Resource clusterResource,  
+      SchedulerNode node, SchedulerApp application, Priority priority,
+      RMContainer reservedContainer) {
     ResourceRequest request = 
       application.getResourceRequest(priority, node.getRackName());
     if (request != null) {
-      if (canAssign(application, priority, node, NodeType.RACK_LOCAL, false)) {
+      if (canAssign(application, priority, node, NodeType.RACK_LOCAL, 
+          reservedContainer)) {
         return assignContainer(clusterResource, node, application, priority, request, 
-            NodeType.RACK_LOCAL);
+            NodeType.RACK_LOCAL, reservedContainer);
       }
     }
     return Resources.none();
   }
 
-  Resource assignOffSwitchContainers(Resource clusterResource, CSNode node, 
-      CSApp application, Priority priority, boolean reserved) {
+  Resource assignOffSwitchContainers(Resource clusterResource, SchedulerNode node, 
+      SchedulerApp application, Priority priority, 
+      RMContainer reservedContainer) {
     ResourceRequest request = 
       application.getResourceRequest(priority, RMNode.ANY);
     if (request != null) {
-      if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reserved)) {
+      if (canAssign(application, priority, node, NodeType.OFF_SWITCH, 
+          reservedContainer)) {
         return assignContainer(clusterResource, node, application, priority, request, 
-            NodeType.OFF_SWITCH);
+            NodeType.OFF_SWITCH, reservedContainer);
       }
     }
     
     return Resources.none();
   }
 
-  boolean canAssign(CSApp application, Priority priority, 
-      CSNode node, NodeType type, boolean reserved) {
+  boolean canAssign(SchedulerApp application, Priority priority, 
+      SchedulerNode node, NodeType type, RMContainer reservedContainer) {
 
     ResourceRequest offSwitchRequest = 
       application.getResourceRequest(priority, RMNode.ANY);
@@ -781,18 +788,18 @@ public class LeafQueue implements Queue {
       
       if (requiredContainers > 0) {
         // No 'delay' for reserved containers
-        if (reserved) {
+        if (reservedContainer != null) {
           return true;
         }
         
-//        // Check if we have waited long enough
-//        if (missedNodes < (requiredContainers * localityWaitFactor)) {
-//          LOG.info("Application " + application.getApplicationId() + 
-//              " has missed " + missedNodes + " opportunities," +
-//              " waitFactor= " + localityWaitFactor + 
-//              " for cluster of size " + scheduler.getNumClusterNodes());
-//          return false;
-//        }
+        // Check if we have waited long enough
+        if (missedNodes < (requiredContainers * localityWaitFactor)) {
+          LOG.info("Application " + application.getApplicationId() + 
+              " has missed " + missedNodes + " opportunities," +
+              " waitFactor= " + localityWaitFactor + 
+              " for cluster of size " + scheduler.getNumClusterNodes());
+          return false;
+        }
         return true;
       }
       return false;
@@ -830,157 +837,162 @@ public class LeafQueue implements Queue {
 
     return false;
   }
-  private Resource assignContainer(Resource clusterResource, CSNode node, 
-      CSApp application, 
-      Priority priority, ResourceRequest request, NodeType type) {
+  
+  private Container getContainer(RMContainer rmContainer, 
+      SchedulerApp application, SchedulerNode node, Resource capability) {
+    if (rmContainer != null) {
+      return rmContainer.getContainer();
+    }
+
+    Container container = 
+          BuilderUtils.newContainer(this.recordFactory,
+              application.getApplicationAttemptId(),
+              application.getNewContainerId(),
+              node.getNodeID(),
+              node.getHttpAddress(), capability);
+
+    // If security is enabled, send the container-tokens too.
+    if (UserGroupInformation.isSecurityEnabled()) {
+      ContainerToken containerToken = 
+          this.recordFactory.newRecordInstance(ContainerToken.class);
+      ContainerTokenIdentifier tokenidentifier =
+          new ContainerTokenIdentifier(container.getId(),
+              container.getNodeId().toString(), container.getResource());
+      containerToken.setIdentifier(
+          ByteBuffer.wrap(tokenidentifier.getBytes()));
+      containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
+      containerToken.setPassword(
+          ByteBuffer.wrap(
+              containerTokenSecretManager.createPassword(tokenidentifier))
+          );
+      containerToken.setService(container.getNodeId().toString());
+      container.setContainerToken(containerToken);
+    }
+
+    return container;
+  }
+  
+  private Resource assignContainer(Resource clusterResource, SchedulerNode node, 
+      SchedulerApp application, Priority priority, 
+      ResourceRequest request, NodeType type, RMContainer rmContainer) {
     LOG.info("DEBUG --- assignContainers:" +
         " node=" + node.getNodeAddress() + 
         " application=" + application.getApplicationId().getId() + 
         " priority=" + priority.getPriority() + 
         " request=" + request + " type=" + type);
     Resource capability = request.getCapability();
-    
-    Resource available = node.getAvailableResource();
-
-    if (available.getMemory() >  0) {
-      
-      int availableContainers = 
-        available.getMemory() / capability.getMemory();         // TODO: A buggy
-                                                                // application
-                                                                // with this
-                                                                // zero would
-                                                                // crash the
-                                                                // scheduler.
-    
-    if (availableContainers > 0) {
-      List<Container> containers =
-        new ArrayList<Container>();
-      Container container =
-         BuilderUtils.newContainer(this.recordFactory,
-                    application.getApplicationAttemptId(),
-                    application.getNewContainerId(),
-                    node.getNodeID(),
-                    node.getHttpAddress(), capability);
-      
-      // If security is enabled, send the container-tokens too.
-      if (UserGroupInformation.isSecurityEnabled()) {
-        ContainerToken containerToken = this.recordFactory.newRecordInstance(ContainerToken.class);
-        ContainerTokenIdentifier tokenidentifier =
-          new ContainerTokenIdentifier(container.getId(),
-              container.getNodeId().toString(), container.getResource());
-        containerToken.setIdentifier(ByteBuffer.wrap(tokenidentifier.getBytes()));
-        containerToken.setKind(ContainerTokenIdentifier.KIND.toString());
-        containerToken.setPassword(ByteBuffer.wrap(containerTokenSecretManager
-              .createPassword(tokenidentifier)));
-          containerToken.setService(container.getNodeId().toString());
-          container.setContainerToken(containerToken);
-        }
 
-        containers.add(container);
+    Resource available = node.getAvailableResource();
 
-        // Allocate
-        allocate(application, type, priority, request, node, containers);
+    assert (available.getMemory() >  0);
 
-        // Did we previously reserve containers at this 'priority'?
-        if (application.isReserved(node, priority)){
-          unreserve(application, priority, node);
-        }
-        
-        LOG.info("assignedContainer" +
-            " application=" + application.getApplicationId() +
-            " container=" + container + 
-            " queue=" + this.toString() + 
-            " util=" + getUtilization() + 
-            " used=" + usedResources + 
-            " cluster=" + clusterResource);
-
-        return container.getResource();
-      } else {
-        // Reserve by 'charging' in advance...
-        reserve(application, priority, node, request.getCapability());
-        
-        LOG.info("Reserved container " + 
-            " application=" + application.getApplicationId() +
-            " resource=" + request.getCapability() + 
-            " queue=" + this.toString() + 
-            " util=" + getUtilization() + 
-            " used=" + usedResources + 
-            " cluster=" + clusterResource);
+    // Create the container if necessary
+    Container container = 
+        getContainer(rmContainer, application, node, capability);
 
-        return request.getCapability();
+    // Can we allocate a container on this node?
+    int availableContainers = 
+        available.getMemory() / capability.getMemory();         
+    if (availableContainers > 0) {
+      // Allocate...
 
+      // Did we previously reserve containers at this 'priority'?
+      if (rmContainer != null){
+        unreserve(application, priority, node, rmContainer);
       }
-    }
 
-    return Resources.none();
-  }
+      // Inform the application
+      RMContainer allocatedContainer = 
+          application.allocate(type, node, priority, request, container);
+      if (allocatedContainer == null) {
+        // Did the application need this resource?
+        return Resources.none();
+      }
 
-  private void allocate(CSApp application, NodeType type, 
-      Priority priority, ResourceRequest request, 
-      CSNode node, List<Container> containers) {
-    // Allocate container to the application
-    // TODO: acm: refactor2 FIXME
-    application.allocate(type, node, priority, request, null);
-
-    for (Container container : containers) {
-      // Create the container and 'start' it.
-      ContainerId containerId = container.getId();
-      RMContext rmContext = this.scheduler.getRMContext();
-      EventHandler eventHandler = rmContext.getDispatcher().getEventHandler();
-      RMContainer rmContainer = new RMContainerImpl(container, application
-          .getApplicationAttemptId(), node.getNodeID(),
-          eventHandler, rmContext.getContainerAllocationExpirer());
-      // TODO: FIX
-//      if (rmContext.getRMContainers().putIfAbsent(containerId, rmContainer) != null) {
-//        LOG.error("Duplicate container addition! ContainerID :  "
-//            + containerId);
-//      } else {
-//        eventHandler.handle(new RMContainerEvent(containerId,
-//            RMContainerEventType.START));
-//      }
+      // Inform the node
+      node.allocateContainer(application.getApplicationId(), 
+          allocatedContainer);
+
+      LOG.info("assignedContainer" +
+          " application=" + application.getApplicationId() +
+          " container=" + container + 
+          " containerId=" + container.getId() + 
+          " queue=" + this + 
+          " util=" + getUtilization() + 
+          " used=" + usedResources + 
+          " cluster=" + clusterResource);
+
+      return container.getResource();
+    } else {
+      // Reserve by 'charging' in advance...
+      reserve(application, priority, node, rmContainer, container);
+
+      LOG.info("Reserved container " + 
+          " application=" + application.getApplicationId() +
+          " resource=" + request.getCapability() + 
+          " queue=" + this.toString() + 
+          " util=" + getUtilization() + 
+          " used=" + usedResources + 
+          " cluster=" + clusterResource);
+
+      return request.getCapability();
     }
-
-    // Inform the NodeManager about the allocation
-    // TODO: acm: refactor2 FIXME
-//    node.allocateContainer(application.getApplicationId(),
-//        containers);
   }
 
-  private void reserve(CSApp application, Priority priority, 
-      CSNode node, Resource resource) {
-    application.reserveResource(node, priority, resource);
-    node.reserveResource(application, priority, resource);
+  private void reserve(SchedulerApp application, Priority priority, 
+      SchedulerNode node, RMContainer rmContainer, Container container) {
+    rmContainer = application.reserve(node, priority, rmContainer, container);
+    node.reserveResource(application, priority, rmContainer);
+    
+    // Update reserved metrics if this is the first reservation
+    if (rmContainer == null) {
+      getMetrics().reserveResource(
+          application.getUser(), container.getResource());
+    }
   }
 
-  private void unreserve(CSApp application, Priority priority, 
-      CSNode node) {
+  private void unreserve(SchedulerApp application, Priority priority, 
+      SchedulerNode node, RMContainer rmContainer) {
     // Done with the reservation?
-    if (application.isReserved(node, priority)) {
-      application.unreserveResource(node, priority);
-      node.unreserveResource(application, priority);
-    }
+    application.unreserve(node, priority);
+    node.unreserveResource(application);
+      
+      // Update reserved metrics
+    getMetrics().unreserveResource(
+        application.getUser(), rmContainer.getContainer().getResource());
   }
 
 
   @Override
   public void completedContainer(Resource clusterResource, 
-      Container container, Resource containerResource, CSApp application) {
+      SchedulerApp application, SchedulerNode node, RMContainer rmContainer, 
+      RMContainerEventType event) {
     if (application != null) {
       // Careful! Locking order is important!
       synchronized (this) {
+
+        Container container = rmContainer.getContainer();
         
-        // Inform the application - this might be an allocated container or
-        // an unfulfilled reservation
-        // TODO: acm: refactor2 FIXME
-        //application.completedContainer(container, containerResource);
-        
+        // Inform the application & the node
+        // Note: It's safe to assume that all state changes to RMContainer
+        // happen under scheduler's lock... 
+        // So, this is, in effect, a transaction across application & node
+        if (rmContainer.getState() == RMContainerState.RESERVED) {
+          application.unreserve(node, rmContainer.getReservedPriority());
+          node.unreserveResource(application);
+        } else {
+          application.containerCompleted(rmContainer, event);
+          node.releaseContainer(container);
+        }
+
+
         // Book-keeping
         releaseResource(clusterResource, 
-            application.getUser(), containerResource);
+            application.getUser(), container.getResource());
 
         LOG.info("completedContainer" +
             " container=" + container +
-            " resource=" + containerResource +
+            " resource=" + container.getResource() +
         		" queue=" + this + 
             " util=" + getUtilization() + 
             " used=" + usedResources + 
@@ -988,29 +1000,41 @@ public class LeafQueue implements Queue {
       }
 
       // Inform the parent queue
-      parent.completedContainer(clusterResource, container, 
-          containerResource, application);
+      parent.completedContainer(clusterResource, application, 
+          node, rmContainer, event);
     }
   }
 
   private synchronized void allocateResource(Resource clusterResource, 
       String userName, Resource resource) {
+    // Update queue metrics
     Resources.addTo(usedResources, resource);
     updateResource(clusterResource);
     ++numContainers;
 
+    // Update user metrics
     User user = getUser(userName);
     user.assignContainer(resource);
+    
+    LOG.info(getQueueName() + 
+        " used=" + usedResources + " numContainers=" + numContainers + 
+        " user=" + userName + " resources=" + user.getConsumedResources());
   }
 
   private synchronized void releaseResource(Resource clusterResource, 
       String userName, Resource resource) {
+    // Update queue metrics
     Resources.subtractFrom(usedResources, resource);
     updateResource(clusterResource);
     --numContainers;
 
+    // Update user metrics
     User user = getUser(userName);
     user.releaseContainer(resource);
+    
+    LOG.info(getQueueName() + 
+        " used=" + usedResources + " numContainers=" + numContainers + 
+        " user=" + userName + " resources=" + user.getConsumedResources());
   }
 
   @Override
@@ -1062,7 +1086,7 @@ public class LeafQueue implements Queue {
 
   @Override
   public void recoverContainer(Resource clusterResource,
-      CSApp application, Container container) {
+      SchedulerApp application, Container container) {
     // Careful! Locking order is important! 
     synchronized (this) {
       allocateResource(clusterResource, application.getUser(), container.getResource());

+ 19 - 15
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -36,7 +36,6 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -46,7 +45,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
 @Private
 @Evolving
@@ -396,7 +399,7 @@ public class ParentQueue implements Queue {
   }
 
   @Override
-  public void submitApplication(CSApp application, String user,
+  public void submitApplication(SchedulerApp application, String user,
       String queue) throws AccessControlException {
     
     synchronized (this) {
@@ -428,7 +431,7 @@ public class ParentQueue implements Queue {
     }
   }
 
-  private synchronized void addApplication(CSApp application, 
+  private synchronized void addApplication(SchedulerApp application, 
       String user) {
   
     ++numApplications;
@@ -441,7 +444,7 @@ public class ParentQueue implements Queue {
   }
   
   @Override
-  public void finishApplication(CSApp application, String queue) {
+  public void finishApplication(SchedulerApp application, String queue) {
     
     synchronized (this) {
       removeApplication(application, application.getUser());
@@ -453,7 +456,7 @@ public class ParentQueue implements Queue {
     }
   }
 
-  public synchronized void removeApplication(CSApp application, 
+  public synchronized void removeApplication(SchedulerApp application, 
       String user) {
     
     --numApplications;
@@ -475,7 +478,7 @@ public class ParentQueue implements Queue {
 
   @Override
   public synchronized Resource assignContainers(
-      Resource clusterResource, CSNode node) {
+      Resource clusterResource, SchedulerNode node) {
     Resource assigned = Resources.createResource(0);
 
     while (canAssign(node)) {
@@ -539,14 +542,14 @@ public class ParentQueue implements Queue {
 
   }
   
-  private boolean canAssign(CSNode node) {
-    return (node.getReservedApplication() == null) && 
+  private boolean canAssign(SchedulerNode node) {
+    return (node.getReservedContainer() == null) && 
         Resources.greaterThanOrEqual(node.getAvailableResource(), 
                                      minimumAllocation);
   }
   
   synchronized Resource assignContainersToChildQueues(Resource cluster, 
-      CSNode node) {
+      SchedulerNode node) {
     Resource assigned = Resources.createResource(0);
     
     printChildQueues();
@@ -588,13 +591,14 @@ public class ParentQueue implements Queue {
   
   @Override
   public void completedContainer(Resource clusterResource,
-      Container container, Resource containerResource, 
-      CSApp application) {
+      SchedulerApp application, SchedulerNode node, 
+      RMContainer rmContainer, RMContainerEventType event) {
     if (application != null) {
       // Careful! Locking order is important!
       // Book keeping
       synchronized (this) {
-        releaseResource(clusterResource, containerResource);
+        releaseResource(clusterResource, 
+            rmContainer.getContainer().getResource());
 
         LOG.info("completedContainer" +
             " queue=" + getQueueName() + 
@@ -605,8 +609,8 @@ public class ParentQueue implements Queue {
 
       // Inform the parent
       if (parent != null) {
-        parent.completedContainer(clusterResource, container, 
-            containerResource, application);
+        parent.completedContainer(clusterResource, application, 
+            node, rmContainer, event);
       }    
     }
   }
@@ -646,7 +650,7 @@ public class ParentQueue implements Queue {
   
   @Override
   public void recoverContainer(Resource clusterResource,
-      CSApp application, Container container) {
+      SchedulerApp application, Container container) {
     // Careful! Locking order is important! 
     synchronized (this) {
       allocateResource(clusterResource, container.getResource());

+ 13 - 11
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java

@@ -26,12 +26,13 @@ import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
 /**
  * Queue represents a node in the tree of 
@@ -138,7 +139,7 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * @param user user who submitted the application
    * @param queue queue to which the application is submitted
    */
-  public void submitApplication(CSApp application, String user, 
+  public void submitApplication(SchedulerApp application, String user, 
       String queue) 
   throws AccessControlException;
   
@@ -147,7 +148,7 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * @param application
    * @param queue application queue 
    */
-  public void finishApplication(CSApp application, String queue);
+  public void finishApplication(SchedulerApp application, String queue);
   
   /**
    * Assign containers to applications in the queue or it's children (if any).
@@ -155,19 +156,20 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * @param node node on which resources are available
    * @return
    */
-  public Resource assignContainers(Resource clusterResource, CSNode node);
+  public Resource assignContainers(Resource clusterResource, SchedulerNode node);
   
   /**
    * A container assigned to the queue has completed.
    * @param clusterResource the resource of the cluster
+   * @param application application to which the container was assigned
+   * @param node node on which the container completed
    * @param container completed container, 
    *                  <code>null</code> if it was just a reservation
-   * @param containerResource allocated resource
-   * @param application application to which the container was assigned
+   * @param event event to be sent to the container
    */
   public void completedContainer(Resource clusterResource,
-      Container container, Resource containerResource, 
-      CSApp application);
+      SchedulerApp application, SchedulerNode node, 
+      RMContainer container, RMContainerEventType event);
 
   /**
    * Get the number of applications in the queue.
@@ -196,6 +198,6 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * @param application the application for which the container was allocated
    * @param container the container that was recovered.
    */
-  public void recoverContainer(Resource clusterResource, CSApp application, 
+  public void recoverContainer(Resource clusterResource, SchedulerApp application, 
       Container container);
 }

+ 38 - 25
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.Lock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -62,7 +63,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@@ -225,7 +225,8 @@ public class FifoScheduler implements ResourceScheduler {
 
     // Release containers
     for (Container releasedContainer : release) {
-      containerCompleted(releasedContainer, RMContainerEventType.RELEASED);
+      containerCompleted(getRMContainer(releasedContainer), 
+          RMContainerEventType.RELEASED);
     }
 
     if (!ask.isEmpty()) {
@@ -261,8 +262,9 @@ public class FifoScheduler implements ResourceScheduler {
   private void normalizeRequest(ResourceRequest ask) {
     int memory = ask.getCapability().getMemory();
     // FIXME: TestApplicationCleanup is relying on unnormalized behavior.
-    memory = MINIMUM_MEMORY *
-    ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
+    memory = 
+        MINIMUM_MEMORY * 
+        ((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
     ask.setCapability(Resources.createResource(memory));
   }
 
@@ -279,12 +281,12 @@ public class FifoScheduler implements ResourceScheduler {
       String queueName, String user) {
     AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
         appAttemptId, queueName, user, null);
-    SchedulerApp schedulerApp = new SchedulerApp(appSchedulingInfo,
-        DEFAULT_QUEUE);
+    SchedulerApp schedulerApp = 
+        new SchedulerApp(this.rmContext, appSchedulingInfo, DEFAULT_QUEUE);
     applications.put(appAttemptId, schedulerApp);
     metrics.submitApp(user);
-    LOG.info("Application Submission: " + appAttemptId.getApplicationId() + " from " + user + 
-        ", currently active: " + applications.size());
+    LOG.info("Application Submission: " + appAttemptId.getApplicationId() + 
+        " from " + user + ", currently active: " + applications.size());
     rmContext.getDispatcher().getEventHandler().handle(
         new RMAppAttemptEvent(appAttemptId,
             RMAppAttemptEventType.APP_ACCEPTED));
@@ -302,7 +304,7 @@ public class FifoScheduler implements ResourceScheduler {
 
     // Kill all 'live' containers
     for (RMContainer container : application.getLiveContainers()) {
-      containerCompleted(container.getContainer(), RMContainerEventType.KILL);
+      containerCompleted(container, RMContainerEventType.KILL);
     }
 
     // Clean up pending requests, metrics etc.
@@ -428,7 +430,7 @@ public class FifoScheduler implements ResourceScheduler {
                 NodeType.DATA_LOCAL), 
                 request.getNumContainers());
       assignedContainers = 
-        assignContainers(node, application, priority, 
+        assignContainer(node, application, priority, 
             assignableContainers, request, NodeType.DATA_LOCAL);
     }
     return assignedContainers;
@@ -446,7 +448,7 @@ public class FifoScheduler implements ResourceScheduler {
                 NodeType.RACK_LOCAL), 
                 request.getNumContainers());
       assignedContainers = 
-        assignContainers(node, application, priority, 
+        assignContainer(node, application, priority, 
             assignableContainers, request, NodeType.RACK_LOCAL);
     }
     return assignedContainers;
@@ -459,13 +461,13 @@ public class FifoScheduler implements ResourceScheduler {
       application.getResourceRequest(priority, SchedulerNode.ANY);
     if (request != null) {
       assignedContainers = 
-        assignContainers(node, application, priority, 
+        assignContainer(node, application, priority, 
             request.getNumContainers(), request, NodeType.OFF_SWITCH);
     }
     return assignedContainers;
   }
 
-  private int assignContainers(SchedulerNode node, SchedulerApp application, 
+  private int assignContainer(SchedulerNode node, SchedulerApp application, 
       Priority priority, int assignableContainers, 
       ResourceRequest request, NodeType type) {
     LOG.debug("assignContainers:" +
@@ -495,10 +497,6 @@ public class FifoScheduler implements ResourceScheduler {
                 application.getNewContainerId(),
                 node.getRMNode().getNodeID(),
                 node.getRMNode().getHttpAddress(), capability);
-        RMContainer rmContainer = new RMContainerImpl(container, application
-            .getApplicationAttemptId(), node.getNodeID(), this.rmContext
-            .getDispatcher().getEventHandler(), this.rmContext
-            .getContainerAllocationExpirer());
         
         // If security is enabled, send the container-tokens too.
         if (UserGroupInformation.isSecurityEnabled()) {
@@ -518,10 +516,14 @@ public class FifoScheduler implements ResourceScheduler {
         }
         
         // Allocate!
-        application.allocate(type, node, priority, request, 
-            Collections.singletonList(rmContainer));
+        
+        // Inform the application
+        RMContainer rmContainer =
+            application.allocate(type, node, priority, request, container);
+        
+        // Inform the node
         node.allocateContainer(application.getApplicationId(), 
-            container);
+            rmContainer);
       }
       
       // Update total usage
@@ -541,7 +543,8 @@ public class FifoScheduler implements ResourceScheduler {
         if (container.getState() == ContainerState.RUNNING) {
           containerLaunchedOnNode(container, node);
         } else { // has to COMPLETE
-          containerCompleted(container, RMContainerEventType.FINISHED);
+          containerCompleted(getRMContainer(container), 
+              RMContainerEventType.FINISHED);
         }
       }
     }
@@ -607,7 +610,7 @@ public class FifoScheduler implements ResourceScheduler {
     {
       ContainerExpiredSchedulerEvent containerExpiredEvent = 
           (ContainerExpiredSchedulerEvent) event;
-      containerCompleted(containerExpiredEvent.getContainer(), 
+      containerCompleted(getRMContainer(containerExpiredEvent.getContainer()), 
           RMContainerEventType.EXPIRE);
     }
     break;
@@ -631,9 +634,10 @@ public class FifoScheduler implements ResourceScheduler {
   }
 
   @Lock(FifoScheduler.class)
-  private synchronized void containerCompleted(Container container,
+  private synchronized void containerCompleted(RMContainer rmContainer,
       RMContainerEventType event) {
     // Get the application for the finished container
+    Container container = rmContainer.getContainer();
     ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
     SchedulerApp application = getApplication(applicationAttemptId);
     
@@ -649,7 +653,7 @@ public class FifoScheduler implements ResourceScheduler {
     }
 
     // Inform the application
-    application.containerCompleted(container, event);
+    application.containerCompleted(rmContainer, event);
 
     // Inform the node
     node.releaseContainer(container);
@@ -667,7 +671,7 @@ public class FifoScheduler implements ResourceScheduler {
   private synchronized void removeNode(RMNode nodeInfo) {
     SchedulerNode node = getNode(nodeInfo.getNodeID());
     // Kill running containers
-    for(Container container : node.getRunningContainers()) {
+    for(RMContainer container : node.getRunningContainers()) {
       containerCompleted(container, RMContainerEventType.KILL);
     }
     
@@ -696,6 +700,7 @@ public class FifoScheduler implements ResourceScheduler {
 
   @Override
   public void recover(RMState state) {
+    // TODO fix recovery
 //    for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
 //      ApplicationId appId = entry.getKey();
 //      ApplicationInfo appInfo = entry.getValue();
@@ -710,4 +715,12 @@ public class FifoScheduler implements ResourceScheduler {
     return new SchedulerNodeReport(
         node.getUsedResource(), node.getNumContainers());
   }
+  
+  private RMContainer getRMContainer(Container container) {
+    ContainerId containerId = container.getId();
+    SchedulerApp application = 
+        getApplication(container.getId().getAppAttemptId());
+    return application.getRMContainer(containerId);
+  }
+
 }