Browse Source

More tests from Sharad ; multinode scheduling and application-cleanup.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1153434 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 14 years ago
parent
commit
e0faee55cf
11 changed files with 266 additions and 45 deletions
  1. 4 5
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java
  2. 4 0
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
  3. 33 3
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  4. 4 0
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
  5. 20 3
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
  6. 3 2
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  7. 5 12
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
  8. 1 1
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  9. 44 17
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  10. 77 0
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
  11. 71 2
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java

+ 4 - 5
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ams/ApplicationMasterService.java

@@ -190,16 +190,15 @@ AMRMProtocol, EventHandler<ApplicationMasterServiceEvent> {
       throws YarnRemoteException {
       throws YarnRemoteException {
 
 
     ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
     ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
-    ApplicationId applicationId = appAttemptId.getApplicationId();
 
 
     this.amLivelinessMonitor.receivedPing(appAttemptId);
     this.amLivelinessMonitor.receivedPing(appAttemptId);
 
 
     /* check if its in cache */
     /* check if its in cache */
     AllocateResponse allocateResponse = recordFactory
     AllocateResponse allocateResponse = recordFactory
         .newRecordInstance(AllocateResponse.class);
         .newRecordInstance(AllocateResponse.class);
-    AMResponse lastResponse = responseMap.get(applicationId);
+    AMResponse lastResponse = responseMap.get(appAttemptId);
     if (lastResponse == null) {
     if (lastResponse == null) {
-      LOG.error("Application doesnt exist in cache " + applicationId);
+      LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
       allocateResponse.setAMResponse(reboot);
       allocateResponse.setAMResponse(reboot);
       return allocateResponse;
       return allocateResponse;
     }
     }
@@ -208,7 +207,7 @@ AMRMProtocol, EventHandler<ApplicationMasterServiceEvent> {
       allocateResponse.setAMResponse(lastResponse);
       allocateResponse.setAMResponse(lastResponse);
       return allocateResponse;
       return allocateResponse;
     } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
     } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
-      LOG.error("Invalid responseid from application " + applicationId);
+      LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
       // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
       // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
       allocateResponse.setAMResponse(reboot);
       allocateResponse.setAMResponse(reboot);
       return allocateResponse;
       return allocateResponse;
@@ -237,7 +236,7 @@ AMRMProtocol, EventHandler<ApplicationMasterServiceEvent> {
                 RMContainerEventType.RELEASED));
                 RMContainerEventType.RELEASED));
       }
       }
 
 
-      RMApp app = this.rmContext.getRMApps().get(applicationId);
+      RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
       RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
       RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
 
 
       // Get the list of finished containers.
       // Get the list of finished containers.

+ 4 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java

@@ -1,8 +1,11 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 
+import java.util.Set;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -37,4 +40,5 @@ public interface RMApp extends EventHandler<RMAppEvent>{
 
 
   StringBuilder getDiagnostics();
   StringBuilder getDiagnostics();
 
 
+  Set<NodeId> getRanNodes();
 }
 }

+ 33 - 3
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -1,8 +1,10 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 
 import java.util.EnumSet;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -16,6 +18,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
 import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -25,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -96,7 +100,7 @@ public class RMAppImpl implements RMApp {
 
 
      // Transitions from RUNNING state
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
     .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
-        RMAppEventType.ATTEMPT_FINISHED)
+        RMAppEventType.ATTEMPT_FINISHED, new FinalTransition())
     .addTransition(RMAppState.RUNNING,
     .addTransition(RMAppState.RUNNING,
         EnumSet.of(RMAppState.RESTARTING, RMAppState.FAILED),
         EnumSet.of(RMAppState.RESTARTING, RMAppState.FAILED),
         RMAppEventType.ATTEMPT_FAILED,
         RMAppEventType.ATTEMPT_FAILED,
@@ -316,6 +320,21 @@ public class RMAppImpl implements RMApp {
     }
     }
   }
   }
 
 
+  @Override
+  public Set<NodeId> getRanNodes() {
+    this.readLock.lock();
+
+    try {
+      Set<NodeId> ranNodes = new HashSet<NodeId>();
+      for (RMAppAttempt attempt : attempts.values()) {
+        ranNodes.addAll(attempt.getRanNodes());
+      }
+      return ranNodes;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
   @Override
   @Override
   public void handle(RMAppEvent event) {
   public void handle(RMAppEvent event) {
 
 
@@ -381,8 +400,19 @@ public class RMAppImpl implements RMApp {
     };
     };
   }
   }
 
 
-  private static final class AttemptFailedTransition implements
-      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
+  private static class FinalTransition extends RMAppTransition {
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      Set<NodeId> ranNodes = app.getRanNodes();
+      for (NodeId nodeId : ranNodes) {
+        app.dispatcher.getEventHandler().handle(
+            new RMNodeCleanAppEvent(nodeId, app.applicationId));
+      }
+    };
+  }
+
+  private static final class AttemptFailedTransition extends FinalTransition 
+    implements
+      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState>  {
 
 
     private final RMAppState initialState;
     private final RMAppState initialState;
 
 

+ 4 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java

@@ -1,10 +1,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
 
 
 import java.util.List;
 import java.util.List;
+import java.util.Set;
 
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.EventHandler;
 
 
 public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent>{
 public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent>{
@@ -25,6 +27,8 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent>{
 
 
   float getProgress();
   float getProgress();
 
 
+  Set<NodeId> getRanNodes();
+
   List<Container> pullJustFinishedContainers();
   List<Container> pullJustFinishedContainers();
 
 
   List<Container> pullNewlyAllocatedContainers();
   List<Container> pullNewlyAllocatedContainers();

+ 20 - 3
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -4,8 +4,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -16,6 +18,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -79,8 +82,13 @@ public class RMAppAttemptImpl implements RMAppAttempt {
   private Map<ContainerId, ContainerId> liveContainers
   private Map<ContainerId, ContainerId> liveContainers
     = new HashMap<ContainerId, ContainerId>();
     = new HashMap<ContainerId, ContainerId>();
 
 
-  private List<Container> newlyAllocatedContainers;
-  private List<ContainerId> justFinishedContainers;
+  //nodes on while this attempt's containers ran
+  private final Set<NodeId> ranNodes = 
+    new HashSet<NodeId>();
+  private final List<Container> newlyAllocatedContainers = 
+    new ArrayList<Container>();
+  private final List<ContainerId> justFinishedContainers = 
+    new ArrayList<ContainerId>();
   private Container masterContainer;
   private Container masterContainer;
 
 
   private float progress = 0;
   private float progress = 0;
@@ -317,7 +325,7 @@ public class RMAppAttemptImpl implements RMAppAttempt {
     this.writeLock.lock();
     this.writeLock.lock();
 
 
     try {
     try {
-      return null;  // TODO: Should just be ContainerId 
+      return new ArrayList<Container>();  // TODO: Should just be ContainerId 
 //      List<Container> returnList = new ArrayList<Container>(
 //      List<Container> returnList = new ArrayList<Container>(
 //          this.justFinishedContainers.size());
 //          this.justFinishedContainers.size());
 //      returnList.addAll(this.justFinishedContainers);
 //      returnList.addAll(this.justFinishedContainers);
@@ -328,6 +336,11 @@ public class RMAppAttemptImpl implements RMAppAttempt {
     }
     }
   }
   }
 
 
+  @Override
+  public Set<NodeId> getRanNodes() {
+    return ranNodes;
+  }
+
   @Override
   @Override
   public List<Container> pullNewlyAllocatedContainers() {
   public List<Container> pullNewlyAllocatedContainers() {
     this.writeLock.lock();
     this.writeLock.lock();
@@ -336,6 +349,10 @@ public class RMAppAttemptImpl implements RMAppAttempt {
       List<Container> returnList = new ArrayList<Container>(
       List<Container> returnList = new ArrayList<Container>(
           this.newlyAllocatedContainers.size());
           this.newlyAllocatedContainers.size());
       returnList.addAll(this.newlyAllocatedContainers);
       returnList.addAll(this.newlyAllocatedContainers);
+      for (Container cont : newlyAllocatedContainers) {
+        ranNodes.add(cont.getNodeId());//add to the nodes set when these containers
+        //are pulled by AM
+      }
       this.newlyAllocatedContainers.clear();
       this.newlyAllocatedContainers.clear();
       return returnList;
       return returnList;
     } finally {
     } finally {

+ 3 - 2
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -373,11 +373,12 @@ public class AppSchedulingInfo {
     for (Container container : containers) {
     for (Container container : containers) {
 
 
       allocated.add(container);
       allocated.add(container);
-      try {
+      //TODO: fixme sharad
+     /* try {
         store.storeContainer(container);
         store.storeContainer(container);
       } catch (IOException ie) {
       } catch (IOException ie) {
         // TODO fix this. we shouldnt ignore
         // TODO fix this. we shouldnt ignore
-      }
+      }*/
       LOG.debug("allocate: applicationId=" + applicationId + " container="
       LOG.debug("allocate: applicationId=" + applicationId + " container="
           + container.getId() + " host="
           + container.getId() + " host="
           + container.getNodeId().toString());
           + container.getNodeId().toString());

+ 5 - 12
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java

@@ -8,7 +8,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationState;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -16,9 +15,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 
 public class SchedulerApp {
 public class SchedulerApp {
 
 
@@ -81,12 +78,6 @@ public class SchedulerApp {
     return this.appSchedulingInfo.getResource(priority);
     return this.appSchedulingInfo.getResource(priority);
   }
   }
 
 
-  public void allocate(NodeType type, SchedulerNode node, Priority priority,
-      ResourceRequest request, List<Container> containers) {
-    this.appSchedulingInfo
-        .allocate(type, node, priority, request, containers);
-  }
-
   public boolean isPending() {
   public boolean isPending() {
     return this.appSchedulingInfo.isPending();
     return this.appSchedulingInfo.isPending();
   }
   }
@@ -113,7 +104,8 @@ public class SchedulerApp {
     Resources.subtractFrom(currentConsumption, containerResource);
     Resources.subtractFrom(currentConsumption, containerResource);
   }
   }
 
 
-  synchronized public void allocate(List<Container> containers) {
+  synchronized public void allocate(NodeType type, SchedulerNode node,
+      Priority priority, ResourceRequest request, List<Container> containers) {
     // Update consumption and track allocations
     // Update consumption and track allocations
     for (Container container : containers) {
     for (Container container : containers) {
       Resources.addTo(currentConsumption, container.getResource());
       Resources.addTo(currentConsumption, container.getResource());
@@ -121,6 +113,7 @@ public class SchedulerApp {
           + " container=" + container.getId() + " host="
           + " container=" + container.getId() + " host="
           + container.getNodeId().toString());
           + container.getNodeId().toString());
     }
     }
+    appSchedulingInfo.allocate(type, node, priority, request, containers);
   }
   }
 
 
   public Resource getCurrentConsumption() {
   public Resource getCurrentConsumption() {
@@ -133,8 +126,8 @@ public class SchedulerApp {
         Map<String, ResourceRequest> requests = getResourceRequests(priority);
         Map<String, ResourceRequest> requests = getResourceRequests(priority);
         if (requests != null) {
         if (requests != null) {
           LOG.debug("showRequests:" + " application=" + getApplicationId() + 
           LOG.debug("showRequests:" + " application=" + getApplicationId() + 
-              " available=" + getHeadroom() + 
-              " current=" + currentConsumption);
+              " headRoom=" + getHeadroom() + 
+              " currentConsumption=" + currentConsumption.getMemory());
           for (ResourceRequest request : requests.values()) {
           for (ResourceRequest request : requests.values()) {
             LOG.debug("showRequests:" + " application=" + getApplicationId()
             LOG.debug("showRequests:" + " application=" + getApplicationId()
                 + " request=" + request);
                 + " request=" + request);

+ 1 - 1
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -494,7 +494,7 @@ public class FifoScheduler implements ResourceScheduler {
         }
         }
         containers.add(container);
         containers.add(container);
       }
       }
-      application.allocate(containers);
+      application.allocate(type, node, priority, request, containers);
       addAllocatedContainers(node, application.getApplicationAttemptId(),
       addAllocatedContainers(node, application.getApplicationAttemptId(),
           containers);
           containers);
       Resources.addTo(usedResource,
       Resources.addTo(usedResource,

+ 44 - 17
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
@@ -52,6 +53,8 @@ import org.apache.log4j.Logger;
 public class MockRM extends ResourceManager {
 public class MockRM extends ResourceManager {
 
 
   private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private Map<NodeId, Integer> responseIds = new HashMap<NodeId, Integer>();
+  private Map<ApplicationAttemptId, Integer> AMResponseIds = new HashMap<ApplicationAttemptId, Integer>();
 
 
   public MockRM() {
   public MockRM() {
     this(new Configuration());
     this(new Configuration());
@@ -140,6 +143,7 @@ public class MockRM extends ResourceManager {
   //from AMS
   //from AMS
   public void registerAppAttempt(ApplicationAttemptId attemptId) throws Exception {
   public void registerAppAttempt(ApplicationAttemptId attemptId) throws Exception {
     waitForState(attemptId, RMAppAttemptState.LAUNCHED);
     waitForState(attemptId, RMAppAttemptState.LAUNCHED);
+    AMResponseIds.put(attemptId, 0);
     RegisterApplicationMasterRequest req = recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
     RegisterApplicationMasterRequest req = recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
     req.setApplicationAttemptId(attemptId);
     req.setApplicationAttemptId(attemptId);
     req.setHost("");
     req.setHost("");
@@ -148,32 +152,51 @@ public class MockRM extends ResourceManager {
     masterService.registerApplicationMaster(req);
     masterService.registerApplicationMaster(req);
   }
   }
 
 
-  public List<Container> allocateFromAM(ApplicationAttemptId attemptId, 
+  public List<Container> allocate(ApplicationAttemptId attemptId, 
       String host, int memory, int numContainers, 
       String host, int memory, int numContainers, 
       List<ContainerId> releases) throws Exception {
       List<ContainerId> releases) throws Exception {
+    List reqs = createReq(host, memory, 1, numContainers);
+    List<Container> toRelease = new ArrayList<Container>();
+    for (ContainerId id : releases) {
+      Container cont = recordFactory.newRecordInstance(Container.class);
+      cont.setId(id);
+      //TOOD: set all fields
+    }
+    return allocate(attemptId, toRelease, reqs);
+  }
+
+  private List<ResourceRequest> createReq(String host, int memory, int priority, 
+      int containers) throws Exception {
+    ResourceRequest hostReq = createResourceReq(host, memory, priority, 
+        containers);
+    ResourceRequest rackReq = createResourceReq("default-rack", memory, 
+        priority, containers);
+    ResourceRequest offRackReq = createResourceReq("*", memory, priority, 
+        containers);
+    return Arrays.asList(new ResourceRequest[] {hostReq, rackReq, offRackReq});
+    
+  }
+  private ResourceRequest createResourceReq(String resource, int memory, int priority, 
+      int containers) throws Exception {
     ResourceRequest req = recordFactory.newRecordInstance(ResourceRequest.class);
     ResourceRequest req = recordFactory.newRecordInstance(ResourceRequest.class);
-    req.setHostName(host);
-    req.setNumContainers(numContainers);
+    req.setHostName(resource);
+    req.setNumContainers(containers);
     Priority pri = recordFactory.newRecordInstance(Priority.class);
     Priority pri = recordFactory.newRecordInstance(Priority.class);
     pri.setPriority(1);
     pri.setPriority(1);
     req.setPriority(pri);
     req.setPriority(pri);
     Resource capability = recordFactory.newRecordInstance(Resource.class);
     Resource capability = recordFactory.newRecordInstance(Resource.class);
     capability.setMemory(memory);
     capability.setMemory(memory);
     req.setCapability(capability);
     req.setCapability(capability);
-    List<Container> toRelease = new ArrayList<Container>();
-    for (ContainerId id : releases) {
-      Container cont = recordFactory.newRecordInstance(Container.class);
-      cont.setId(id);
-      //TOOD: set all fields
-    }
-    return allocateFromAM(attemptId, toRelease, 
-        Arrays.asList(new ResourceRequest[] {req}));
+    return req;
   }
   }
 
 
-  public List<Container> allocateFromAM(ApplicationAttemptId attemptId, 
+  public List<Container> allocate(ApplicationAttemptId attemptId, 
       List<Container> releases, List<ResourceRequest> resourceRequest) 
       List<Container> releases, List<ResourceRequest> resourceRequest) 
       throws Exception {
       throws Exception {
     AllocateRequest req = recordFactory.newRecordInstance(AllocateRequest.class);
     AllocateRequest req = recordFactory.newRecordInstance(AllocateRequest.class);
+    int responseId = AMResponseIds.remove(attemptId) + 1;
+    AMResponseIds.put(attemptId, responseId);
+    req.setResponseId(responseId);
     req.setApplicationAttemptId(attemptId);
     req.setApplicationAttemptId(attemptId);
     req.addAllAsks(resourceRequest);
     req.addAllAsks(resourceRequest);
     req.addAllReleases(releases);
     req.addAllReleases(releases);
@@ -182,6 +205,7 @@ public class MockRM extends ResourceManager {
   }
   }
 
 
   public void unregisterAppAttempt(ApplicationAttemptId attemptId) throws Exception {
   public void unregisterAppAttempt(ApplicationAttemptId attemptId) throws Exception {
+    AMResponseIds.remove(attemptId);
     waitForState(attemptId, RMAppAttemptState.RUNNING);
     waitForState(attemptId, RMAppAttemptState.RUNNING);
     FinishApplicationMasterRequest req = recordFactory.newRecordInstance(FinishApplicationMasterRequest.class);
     FinishApplicationMasterRequest req = recordFactory.newRecordInstance(FinishApplicationMasterRequest.class);
     req.setAppAttemptId(attemptId);
     req.setAppAttemptId(attemptId);
@@ -211,17 +235,18 @@ public class MockRM extends ResourceManager {
     resource.setMemory(memory);
     resource.setMemory(memory);
     req.setResource(resource);
     req.setResource(resource);
     getResourceTrackerService().registerNodeManager(req);
     getResourceTrackerService().registerNodeManager(req);
+    responseIds.put(nodeId, 0);
   }
   }
 
 
-  public void nodeHeartbeat(String nodeIdStr, boolean b) throws Exception {
+  public HeartbeatResponse nodeHeartbeat(String nodeIdStr, boolean b) throws Exception {
     String[] splits = nodeIdStr.split(":");
     String[] splits = nodeIdStr.split(":");
     NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
     NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
     nodeId.setHost(splits[0]);
     nodeId.setHost(splits[0]);
     nodeId.setPort(Integer.parseInt(splits[1]));
     nodeId.setPort(Integer.parseInt(splits[1]));
-    nodeHeartbeat(nodeId, new HashMap<ApplicationId, List<Container>>(), b);
+    return nodeHeartbeat(nodeId, new HashMap<ApplicationId, List<Container>>(), b);
   }
   }
 
 
-  public void nodeHeartbeat(NodeId nodeId, Map<ApplicationId, 
+  public HeartbeatResponse nodeHeartbeat(NodeId nodeId, Map<ApplicationId, 
       List<Container>> conts, boolean isHealthy) throws Exception {
       List<Container>> conts, boolean isHealthy) throws Exception {
     NodeHeartbeatRequest req = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
     NodeHeartbeatRequest req = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
     NodeStatus status = recordFactory.newRecordInstance(NodeStatus.class);
     NodeStatus status = recordFactory.newRecordInstance(NodeStatus.class);
@@ -234,9 +259,11 @@ public class MockRM extends ResourceManager {
     healthStatus.setIsNodeHealthy(isHealthy);
     healthStatus.setIsNodeHealthy(isHealthy);
     healthStatus.setLastHealthReportTime(1);
     healthStatus.setLastHealthReportTime(1);
     status.setNodeHealthStatus(healthStatus);
     status.setNodeHealthStatus(healthStatus);
-    status.setResponseId(1);
+    int responseId = responseIds.remove(nodeId) + 1;
+    responseIds.put(nodeId, responseId);
+    status.setResponseId(responseId);
     req.setNodeStatus(status);
     req.setNodeStatus(status);
-    getResourceTrackerService().nodeHeartbeat(req);
+    return getResourceTrackerService().nodeHeartbeat(req).getHeartbeatResponse();
   }
   }
 
 
   @Override
   @Override

+ 77 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java

@@ -0,0 +1,77 @@
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+public class TestApplicationCleanup {
+
+  @Test
+  public void testAppCleanup() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    MockRM rm = new MockRM();
+    rm.start();
+    NodeId node1 = rm.registerNode("h1", 5000);
+    
+    RMApp app = rm.submitApp(2000);
+
+    //kick the scheduling
+    rm.nodeHeartbeat(node1, true);
+
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    rm.sendAMLaunched(attempt.getAppAttemptId());
+    rm.registerAppAttempt(attempt.getAppAttemptId());
+    
+    //request for containers
+    int request = 2;
+    rm.allocate(attempt.getAppAttemptId(), "h1" , 1000, request, 
+        new ArrayList<ContainerId>());
+    
+    //kick the scheduler
+    rm.nodeHeartbeat(node1, true);
+    List<Container> conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+        new ArrayList<ResourceRequest>());
+    int contReceived = conts.size();
+    while (contReceived < request) {
+      conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+          new ArrayList<ResourceRequest>());
+      contReceived += conts.size();
+      Log.info("Got " + contReceived + " containers. Waiting to get " + request);
+      Thread.sleep(2000);
+    }
+    Assert.assertEquals(request, conts.size());
+    
+    rm.unregisterAppAttempt(attempt.getAppAttemptId());
+    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED);
+
+    int size = rm.nodeHeartbeat(node1, true).getApplicationsToCleanupList().size();
+    while(size < 1) {
+      Thread.sleep(1000);
+      Log.info("Waiting to get application cleanup..");
+      size = rm.nodeHeartbeat(node1, true).getApplicationsToCleanupList().size();
+    }
+    Assert.assertEquals(1, size);
+
+    rm.stop();
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestApplicationCleanup t = new TestApplicationCleanup();
+    t.testAppCleanup();
+  }
+}

+ 71 - 2
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java

@@ -1,5 +1,16 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -10,8 +21,31 @@ import org.junit.Test;
 
 
 public class TestRM {
 public class TestRM {
 
 
+  private static final Log LOG = LogFactory.getLog(TestRM.class);
+
+  @Test
+  public void testAppWithNoContainers() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    MockRM rm = new MockRM();
+    rm.start();
+    rm.registerNode("h1:1234", 5000);
+    
+    RMApp app = rm.submitApp(2000);
+
+    //kick the scheduling
+    rm.nodeHeartbeat("h1:1234", true);
+
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    rm.sendAMLaunched(attempt.getAppAttemptId());
+    rm.registerAppAttempt(attempt.getAppAttemptId());
+    rm.unregisterAppAttempt(attempt.getAppAttemptId());
+    rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED);
+    rm.stop();
+  }
+
   @Test
   @Test
-  public void testApp() throws Exception {
+  public void testAppOnMultiNode() throws Exception {
     Logger rootLogger = LogManager.getRootLogger();
     Logger rootLogger = LogManager.getRootLogger();
     rootLogger.setLevel(Level.DEBUG);
     rootLogger.setLevel(Level.DEBUG);
     MockRM rm = new MockRM();
     MockRM rm = new MockRM();
@@ -27,6 +61,40 @@ public class TestRM {
     RMAppAttempt attempt = app.getCurrentAppAttempt();
     RMAppAttempt attempt = app.getCurrentAppAttempt();
     rm.sendAMLaunched(attempt.getAppAttemptId());
     rm.sendAMLaunched(attempt.getAppAttemptId());
     rm.registerAppAttempt(attempt.getAppAttemptId());
     rm.registerAppAttempt(attempt.getAppAttemptId());
+    
+    //request for containers
+    int request = 13;
+    rm.allocate(attempt.getAppAttemptId(), "h1" , 1000, request, 
+        new ArrayList<ContainerId>());
+    
+    //kick the scheduler
+    rm.nodeHeartbeat("h1:1234", true);
+    List<Container> conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+        new ArrayList<ResourceRequest>());
+    int contReceived = conts.size();
+    while (contReceived < 3) {//only 3 containers are available on node1
+      conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+          new ArrayList<ResourceRequest>());
+      contReceived += conts.size();
+      LOG.info("Got " + contReceived + " containers. Waiting to get " + 3);
+      Thread.sleep(2000);
+    }
+    Assert.assertEquals(3, conts.size());
+
+    //send node2 heartbeat
+    rm.nodeHeartbeat("h2:5678", true);
+    conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+        new ArrayList<ResourceRequest>());
+    contReceived = conts.size();
+    while (contReceived < 10) {
+      conts = rm.allocate(attempt.getAppAttemptId(), new ArrayList<Container>(),
+          new ArrayList<ResourceRequest>());
+      contReceived += conts.size();
+      LOG.info("Got " + contReceived + " containers. Waiting to get " + 10);
+      Thread.sleep(2000);
+    }
+    Assert.assertEquals(10, conts.size());
+
     rm.unregisterAppAttempt(attempt.getAppAttemptId());
     rm.unregisterAppAttempt(attempt.getAppAttemptId());
     rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED);
     rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.FINISHED);
 
 
@@ -35,6 +103,7 @@ public class TestRM {
 
 
   public static void main(String[] args) throws Exception {
   public static void main(String[] args) throws Exception {
     TestRM t = new TestRM();
     TestRM t = new TestRM();
-    t.testApp();
+    t.testAppWithNoContainers();
+    t.testAppOnMultiNode();
   }
   }
 }
 }