Selaa lähdekoodia

Merge -r 1176761:1176762 from trunk to branch-0.23 to fix MAPREDUCE-3078.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1176763 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 vuotta sitten
vanhempi
commit
e6f5bf6dd8
25 muutettua tiedostoa jossa 822 lisäystä ja 548 poistoa
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml
  3. 2 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
  4. 11 18
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
  5. 20 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
  6. 32 22
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
  7. 5 8
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
  8. 624 445
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
  9. 24 8
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
  10. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
  11. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml
  12. 23 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
  13. 14 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
  14. 5 7
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
  15. 12 4
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  16. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
  17. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
  18. 2 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
  19. 7 8
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
  20. 0 3
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java
  21. 2 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  22. 4 6
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
  23. 8 5
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java
  24. 1 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
  25. 3 6
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -1437,6 +1437,9 @@ Release 0.23.0 - Unreleased
 
     MAPREDUCE-3110. Fixed TestRPC failure. (vinodkv)
 
+    MAPREDUCE-3078. Ensure MapReduce AM reports progress correctly for
+    displaying on the RM Web-UI. (vinodkv via acmurthy)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml

@@ -55,6 +55,12 @@
       <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-shuffle</artifactId>

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -549,9 +549,9 @@ public class MRAppMaster extends CompositeService {
     // It's more test friendly to put it here.
     DefaultMetricsSystem.initialize("MRAppMaster");
 
-    /** create a job event for job intialization */
+    // create a job event for job intialization
     JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
-    /** send init to the job (this does NOT trigger job execution) */
+    // Send init to the job (this does NOT trigger job execution)
     // This is a synchronous call, not an event through dispatcher. We want
     // job-init to be done completely here.
     jobEventDispatcher.handle(initJobEvent);

+ 11 - 18
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -92,6 +92,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -584,25 +585,17 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
   public JobReport getReport() {
     readLock.lock();
     try {
-      JobReport report = recordFactory.newRecordInstance(JobReport.class);
-      report.setJobId(jobId);
-      report.setJobState(getState());
-      
-      // TODO - Fix to correctly setup report and to check state
-      if (report.getJobState() == JobState.NEW) {
-        return report;
+      JobState state = getState();
+
+      if (getState() == JobState.NEW) {
+        return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
+            startTime, finishTime, setupProgress, 0.0f,
+            0.0f, cleanupProgress);
       }
-      
-      report.setStartTime(startTime);
-      report.setFinishTime(finishTime);
-      report.setSetupProgress(setupProgress);
-      report.setCleanupProgress(cleanupProgress);
-      report.setMapProgress(computeProgress(mapTasks));
-      report.setReduceProgress(computeProgress(reduceTasks));
-      report.setJobName(jobName);
-      report.setUser(username);
-
-      return report;
+
+      return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
+          startTime, finishTime, setupProgress, computeProgress(mapTasks),
+          computeProgress(reduceTasks), cleanupProgress);
     } finally {
       readLock.unlock();
     }

+ 20 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app.local;
 
+import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -30,15 +31,19 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
-import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 
 /**
@@ -65,6 +70,20 @@ public class LocalContainerAllocator extends RMCommunicator
     this.appID = context.getApplicationID();
   }
 
+  @Override
+  protected synchronized void heartbeat() throws Exception {
+    AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
+        this.applicationAttemptId, this.lastResponseID, super
+            .getApplicationProgress(), new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>());
+    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
+    AMResponse response = allocateResponse.getAMResponse();
+    if (response.getReboot()) {
+      // TODO
+      LOG.info("Event from RM: shutting down Application Master");
+    }
+  }
+
   @Override
   public void handle(ContainerAllocatorEvent event) {
     if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {

+ 32 - 22
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.v2.app.rm;
 
 import java.io.IOException;
 import java.security.PrivilegedAction;
-import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,6 +28,7 @@ import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -42,17 +42,12 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -64,7 +59,7 @@ import org.apache.hadoop.yarn.service.AbstractService;
 /**
  * Registers/unregisters to RM and sends heartbeats to RM.
  */
-public class RMCommunicator extends AbstractService  {
+public abstract class RMCommunicator extends AbstractService  {
   private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
   private int rmPollInterval;//millis
   protected ApplicationId applicationId;
@@ -74,7 +69,7 @@ public class RMCommunicator extends AbstractService  {
   protected EventHandler eventHandler;
   protected AMRMProtocol scheduler;
   private final ClientService clientService;
-  private int lastResponseID;
+  protected int lastResponseID;
   private Resource minContainerCapability;
   private Resource maxContainerCapability;
 
@@ -121,6 +116,34 @@ public class RMCommunicator extends AbstractService  {
     return job;
   }
 
+  /**
+   * Get the appProgress. Can be used only after this component is started.
+   * @return the appProgress.
+   */
+  protected float getApplicationProgress() {
+    // For now just a single job. In future when we have a DAG, we need an
+    // aggregate progress.
+    JobReport report = this.job.getReport();
+    float setupWeight = 0.05f;
+    float cleanupWeight = 0.05f;
+    float mapWeight = 0.0f;
+    float reduceWeight = 0.0f;
+    int numMaps = this.job.getTotalMaps();
+    int numReduces = this.job.getTotalReduces();
+    if (numMaps == 0 && numReduces == 0) {
+    } else if (numMaps == 0) {
+      reduceWeight = 0.9f;
+    } else if (numReduces == 0) {
+      mapWeight = 0.9f;
+    } else {
+      mapWeight = reduceWeight = 0.45f;
+    }
+    return (report.getSetupProgress() * setupWeight
+        + report.getCleanupProgress() * cleanupWeight
+        + report.getMapProgress() * mapWeight + report.getReduceProgress()
+        * reduceWeight);
+  }
+
   protected void register() {
     //Register
     String host = 
@@ -262,18 +285,5 @@ public class RMCommunicator extends AbstractService  {
     });
   }
 
-  protected synchronized void heartbeat() throws Exception {
-    AllocateRequest allocateRequest =
-        recordFactory.newRecordInstance(AllocateRequest.class);
-    allocateRequest.setApplicationAttemptId(applicationAttemptId);
-    allocateRequest.setResponseId(lastResponseID);
-    allocateRequest.addAllAsks(new ArrayList<ResourceRequest>());
-    allocateRequest.addAllReleases(new ArrayList<ContainerId>());
-    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
-    AMResponse response = allocateResponse.getAMResponse();
-    if (response.getReboot()) {
-      LOG.info("Event from RM: shutting down Application Master");
-    }
-  }
-
+  protected abstract void heartbeat() throws Exception;
 }

+ 5 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 
 /**
  * Keeps the data structures to send container requests to RM.
@@ -107,15 +108,11 @@ public abstract class RMContainerRequestor extends RMCommunicator {
     LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
   }
 
-  protected abstract void heartbeat() throws Exception;
-
   protected AMResponse makeRemoteRequest() throws YarnRemoteException {
-    AllocateRequest allocateRequest = recordFactory
-        .newRecordInstance(AllocateRequest.class);
-    allocateRequest.setApplicationAttemptId(applicationAttemptId);
-    allocateRequest.setResponseId(lastResponseID);
-    allocateRequest.addAllAsks(new ArrayList<ResourceRequest>(ask));
-    allocateRequest.addAllReleases(new ArrayList<ContainerId>(release));
+    AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
+        applicationAttemptId, lastResponseID, super.getApplicationProgress(),
+        new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(
+            release));
     AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
     AMResponse response = allocateResponse.getAMResponse();
     lastResponseID = response.getResponseId();

+ 624 - 445
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

@@ -18,12 +18,15 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 import junit.framework.Assert;
@@ -32,475 +35,651 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationMaster;
-import org.apache.hadoop.yarn.api.records.ApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
-import org.junit.BeforeClass;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.After;
 import org.junit.Test;
 
 public class TestRMContainerAllocator {
-//  private static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class);
-//  private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-//
-//  @BeforeClass
-//  public static void preTests() {
-//    DefaultMetricsSystem.shutdown();
-//  }
-//
-//  @Test
-//  public void testSimple() throws Exception {
-//    FifoScheduler scheduler = createScheduler();
-//    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
-//        scheduler, new Configuration());
-//
-//    //add resources to scheduler
-//    RMNode nodeManager1 = addNode(scheduler, "h1", 10240);
-//    RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
-//    RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
-//
-//    //create the container request
-//    ContainerRequestEvent event1 = 
-//      createReq(1, 1024, new String[]{"h1"});
-//    allocator.sendRequest(event1);
-//
-//    //send 1 more request with different resource req
-//    ContainerRequestEvent event2 = createReq(2, 1024, new String[]{"h2"});
-//    allocator.sendRequest(event2);
-//
-//    //this tells the scheduler about the requests
-//    //as nodes are not added, no allocations
-//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-//
-//    //send another request with different resource and priority
-//    ContainerRequestEvent event3 = createReq(3, 1024, new String[]{"h3"});
-//    allocator.sendRequest(event3);
-//
-//    //this tells the scheduler about the requests
-//    //as nodes are not added, no allocations
-//    assigned = allocator.schedule();
-//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-//
-//    //update resources in scheduler
-//    scheduler.nodeUpdate(nodeManager1); // Node heartbeat
-//    scheduler.nodeUpdate(nodeManager2); // Node heartbeat
-//    scheduler.nodeUpdate(nodeManager3); // Node heartbeat
-//
-//
-//    assigned = allocator.schedule();
-//    checkAssignments(
-//        new ContainerRequestEvent[]{event1, event2, event3}, assigned, false);
-//  }
-//
-//  //TODO: Currently Scheduler seems to have bug where it does not work
-//  //for Application asking for containers with different capabilities.
-//  //@Test
-//  public void testResource() throws Exception {
-//    FifoScheduler scheduler = createScheduler();
-//    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
-//        scheduler, new Configuration());
-//
-//    //add resources to scheduler
-//    RMNode nodeManager1 = addNode(scheduler, "h1", 10240);
-//    RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
-//    RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
-//
-//    //create the container request
-//    ContainerRequestEvent event1 = 
-//      createReq(1, 1024, new String[]{"h1"});
-//    allocator.sendRequest(event1);
-//
-//    //send 1 more request with different resource req
-//    ContainerRequestEvent event2 = createReq(2, 2048, new String[]{"h2"});
-//    allocator.sendRequest(event2);
-//
-//    //this tells the scheduler about the requests
-//    //as nodes are not added, no allocations
-//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-//
-//    //update resources in scheduler
-//    scheduler.nodeUpdate(nodeManager1); // Node heartbeat
-//    scheduler.nodeUpdate(nodeManager2); // Node heartbeat
-//    scheduler.nodeUpdate(nodeManager3); // Node heartbeat
-//
-//    assigned = allocator.schedule();
-//    checkAssignments(
-//        new ContainerRequestEvent[]{event1, event2}, assigned, false);
-//  }
-//
-//  @Test
-//  public void testMapReduceScheduling() throws Exception {
-//    FifoScheduler scheduler = createScheduler();
-//    Configuration conf = new Configuration();
-//    LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
-//        scheduler, conf);
-//
-//    //add resources to scheduler
-//    RMNode nodeManager1 = addNode(scheduler, "h1", 1024);
-//    RMNode nodeManager2 = addNode(scheduler, "h2", 10240);
-//    RMNode nodeManager3 = addNode(scheduler, "h3", 10240);
-//
-//    //create the container request
-//    //send MAP request
-//    ContainerRequestEvent event1 = 
-//      createReq(1, 2048, new String[]{"h1", "h2"}, true, false);
-//    allocator.sendRequest(event1);
-//
-//    //send REDUCE request
-//    ContainerRequestEvent event2 = createReq(2, 3000, new String[]{"h1"}, false, true);
-//    allocator.sendRequest(event2);
-//
-//    //send MAP request
-//    ContainerRequestEvent event3 = createReq(3, 2048, new String[]{"h3"}, false, false);
-//    allocator.sendRequest(event3);
-//
-//    //this tells the scheduler about the requests
-//    //as nodes are not added, no allocations
-//    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
-//    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-//
-//    //update resources in scheduler
-//    scheduler.nodeUpdate(nodeManager1); // Node heartbeat
-//    scheduler.nodeUpdate(nodeManager2); // Node heartbeat
-//    scheduler.nodeUpdate(nodeManager3); // Node heartbeat
-//
-//    assigned = allocator.schedule();
-//    checkAssignments(
-//        new ContainerRequestEvent[]{event1, event3}, assigned, false);
-//
-//    //validate that no container is assigned to h1 as it doesn't have 2048
-//    for (TaskAttemptContainerAssignedEvent assig : assigned) {
-//      Assert.assertFalse("Assigned count not correct", 
-//          "h1".equals(assig.getContainer().getNodeId().getHost()));
-//    }
-//  }
-//
-//
-//
-//  private RMNode addNode(FifoScheduler scheduler, 
-//      String nodeName, int memory) {
-//    NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
-//    nodeId.setHost(nodeName);
-//    nodeId.setPort(1234);
-//    Resource resource = recordFactory.newRecordInstance(Resource.class);
-//    resource.setMemory(memory);
-//    RMNode nodeManager = new RMNodeImpl(nodeId, null, nodeName, 0, 0,
-//        ResourceTrackerService.resolve(nodeName), resource);
-//    scheduler.addNode(nodeManager); // Node registration
-//    return nodeManager;
-//  }
-//
-//  private FifoScheduler createScheduler() throws YarnRemoteException {
-//    FifoScheduler fsc = new FifoScheduler() {
-//      //override this to copy the objects
-//      //otherwise FifoScheduler updates the numContainers in same objects as kept by
-//      //RMContainerAllocator
-//      
-//      @Override
-//      public synchronized void allocate(ApplicationAttemptId applicationId,
-//          List<ResourceRequest> ask) {
-//        List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
-//        for (ResourceRequest req : ask) {
-//          ResourceRequest reqCopy = recordFactory.newRecordInstance(ResourceRequest.class);
-//          reqCopy.setPriority(req.getPriority());
-//          reqCopy.setHostName(req.getHostName());
-//          reqCopy.setCapability(req.getCapability());
-//          reqCopy.setNumContainers(req.getNumContainers());
-//          askCopy.add(reqCopy);
-//        }
-//        super.allocate(applicationId, askCopy);
-//      }
-//    };
-//    try {
-//      fsc.reinitialize(new Configuration(), new ContainerTokenSecretManager(), null);
-//      fsc.addApplication(recordFactory.newRecordInstance(ApplicationId.class),
-//          recordFactory.newRecordInstance(ApplicationMaster.class),
-//          "test", null, null, StoreFactory.createVoidAppStore());
-//    } catch(IOException ie) {
-//      LOG.info("add application failed with ", ie);
-//      assert(false);
-//    }
-//    return fsc;
-//  }
-//
-//  private ContainerRequestEvent createReq(
-//      int attemptid, int memory, String[] hosts) {
-//    return createReq(attemptid, memory, hosts, false, false);
-//  }
-//  
-//  private ContainerRequestEvent createReq(
-//      int attemptid, int memory, String[] hosts, boolean earlierFailedAttempt, boolean reduce) {
-//    ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
-//    appId.setClusterTimestamp(0);
-//    appId.setId(0);
-//    JobId jobId = recordFactory.newRecordInstance(JobId.class);
-//    jobId.setAppId(appId);
-//    jobId.setId(0);
-//    TaskId taskId = recordFactory.newRecordInstance(TaskId.class);
-//    taskId.setId(0);
-//    taskId.setJobId(jobId);
-//    if (reduce) {
-//      taskId.setTaskType(TaskType.REDUCE);
-//    } else {
-//      taskId.setTaskType(TaskType.MAP);
-//    }
-//    TaskAttemptId attemptId = recordFactory.newRecordInstance(TaskAttemptId.class);
-//    attemptId.setId(attemptid);
-//    attemptId.setTaskId(taskId);
-//    Resource containerNeed = recordFactory.newRecordInstance(Resource.class);
-//    containerNeed.setMemory(memory);
-//    if (earlierFailedAttempt) {
-//      return ContainerRequestEvent.
-//           createContainerRequestEventForFailedContainer(attemptId, containerNeed);
-//    }
-//    return new ContainerRequestEvent(attemptId, 
-//        containerNeed, 
-//        hosts, new String[] {NetworkTopology.DEFAULT_RACK});
-//  }
-//
-//  private void checkAssignments(ContainerRequestEvent[] requests, 
-//      List<TaskAttemptContainerAssignedEvent> assignments, 
-//      boolean checkHostMatch) {
-//    Assert.assertNotNull("Container not assigned", assignments);
-//    Assert.assertEquals("Assigned count not correct", 
-//        requests.length, assignments.size());
-//
-//    //check for uniqueness of containerIDs
-//    Set<ContainerId> containerIds = new HashSet<ContainerId>();
-//    for (TaskAttemptContainerAssignedEvent assigned : assignments) {
-//      containerIds.add(assigned.getContainer().getId());
-//    }
-//    Assert.assertEquals("Assigned containers must be different", 
-//        assignments.size(), containerIds.size());
-//
-//    //check for all assignment
-//    for (ContainerRequestEvent req : requests) {
-//      TaskAttemptContainerAssignedEvent assigned = null;
-//      for (TaskAttemptContainerAssignedEvent ass : assignments) {
-//        if (ass.getTaskAttemptID().equals(req.getAttemptID())){
-//          assigned = ass;
-//          break;
-//        }
-//      }
-//      checkAssignment(req, assigned, checkHostMatch);
-//    }
-//  }
-//
-//  private void checkAssignment(ContainerRequestEvent request, 
-//      TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) {
-//    Assert.assertNotNull("Nothing assigned to attempt " + request.getAttemptID(),
-//        assigned);
-//    Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(),
-//        assigned.getTaskAttemptID());
-//    if (checkHostMatch) {
-//      Assert.assertTrue("Not assigned to requested host", Arrays.asList(
-//          request.getHosts()).contains(
-//          assigned.getContainer().getNodeId().toString()));
-//    }
-//
-//  }
-//
-//  //Mock RMContainerAllocator
-//  //Instead of talking to remote Scheduler,uses the local Scheduler
-//  public static class LocalRMContainerAllocator extends RMContainerAllocator {
-//    private static final List<TaskAttemptContainerAssignedEvent> events = 
-//      new ArrayList<TaskAttemptContainerAssignedEvent>();
-//
-//    public static class AMRMProtocolImpl implements AMRMProtocol {
-//
-//      private ResourceScheduler resourceScheduler;
-//
-//      public AMRMProtocolImpl(ResourceScheduler resourceScheduler) {
-//        this.resourceScheduler = resourceScheduler;
-//      }
-//
-//      @Override
-//      public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request) throws YarnRemoteException {
-//        RegisterApplicationMasterResponse response = recordFactory.newRecordInstance(RegisterApplicationMasterResponse.class);
-//        return response;
-//      }
-//
-//      public AllocateResponse allocate(AllocateRequest request) throws YarnRemoteException {
-//        List<ResourceRequest> ask = request.getAskList();
-//        List<Container> release = request.getReleaseList();
-//        try {
-//          AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
-//          Allocation allocation = resourceScheduler.allocate(request.getApplicationAttemptId(), ask);
-//          response.addAllNewContainers(allocation.getContainers());
-//          response.setAvailableResources(allocation.getResourceLimit());
-//          AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class);
-//          allocateResponse.setAMResponse(response);
-//          return allocateResponse;
-//        } catch(IOException ie) {
-//          throw RPCUtil.getRemoteException(ie);
-//        }
-//      }
-//
-//      @Override
-//      public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request) throws YarnRemoteException {
-//        FinishApplicationMasterResponse response = recordFactory.newRecordInstance(FinishApplicationMasterResponse.class);
-//        return response;
-//      }
-//
-//    }
-//
-//    private ResourceScheduler scheduler;
-//    LocalRMContainerAllocator(ResourceScheduler scheduler, Configuration conf) {
-//      super(null, new TestContext(events));
-//      this.scheduler = scheduler;
-//      super.init(conf);
-//      super.start();
-//    }
-//
-//    protected AMRMProtocol createSchedulerProxy() {
-//      return new AMRMProtocolImpl(scheduler);
-//    }
-//
-//    @Override
-//    protected void register() {}
-//    @Override
-//    protected void unregister() {}
-//
-//    @Override
-//    protected Resource getMinContainerCapability() {
-//      Resource res = recordFactory.newRecordInstance(Resource.class);
-//      res.setMemory(1024);
-//      return res;
-//    }
-//    
-//    @Override
-//    protected Resource getMaxContainerCapability() {
-//      Resource res = recordFactory.newRecordInstance(Resource.class);
-//      res.setMemory(10240);
-//      return res;
-//    }
-//    
-//    public void sendRequest(ContainerRequestEvent req) {
-//      sendRequests(Arrays.asList(new ContainerRequestEvent[]{req}));
-//    }
-//
-//    public void sendRequests(List<ContainerRequestEvent> reqs) {
-//      for (ContainerRequestEvent req : reqs) {
-//        handle(req);
-//      }
-//    }
-//
-//    //API to be used by tests
-//    public List<TaskAttemptContainerAssignedEvent> schedule() {
-//      //run the scheduler
-//      try {
-//        heartbeat();
-//      } catch (Exception e) {
-//        LOG.error("error in heartbeat ", e);
-//        throw new YarnException(e);
-//      }
-//
-//      List<TaskAttemptContainerAssignedEvent> result = new ArrayList(events);
-//      events.clear();
-//      return result;
-//    }
-//
-//    protected void startAllocatorThread() {
-//      //override to NOT start thread
-//    }
-//
-//    static class TestContext implements AppContext {
-//      private List<TaskAttemptContainerAssignedEvent> events;
-//      TestContext(List<TaskAttemptContainerAssignedEvent> events) {
-//        this.events = events;
-//      }
-//      @Override
-//      public Map<JobId, Job> getAllJobs() {
-//        return null;
-//      }
-//      @Override
-//      public ApplicationAttemptId getApplicationAttemptId() {
-//        return recordFactory.newRecordInstance(ApplicationAttemptId.class);
-//      }
-//      @Override
-//      public ApplicationId getApplicationID() {
-//        return recordFactory.newRecordInstance(ApplicationId.class);
-//      }
-//      @Override
-//      public EventHandler getEventHandler() {
-//        return new EventHandler() {
-//          @Override
-//          public void handle(Event event) {
-//            events.add((TaskAttemptContainerAssignedEvent) event);
-//          }
-//        };
-//      }
-//      @Override
-//      public Job getJob(JobId jobID) {
-//        return null;
-//      }
-//
-//      @Override
-//      public String getUser() {
-//        return null;
-//      }
-//
-//      @Override
-//      public Clock getClock() {
-//        return null;
-//      }
-//
-//      @Override
-//      public String getApplicationName() {
-//        return null;
-//      }
-//
-//      @Override
-//      public long getStartTime() {
-//        return 0;
-//      }
-//    }
-//  }
-//
-//  public static void main(String[] args) throws Exception {
-//    TestRMContainerAllocator t = new TestRMContainerAllocator();
-//    t.testSimple();
-//    //t.testResource();
-//    t.testMapReduceScheduling();
-//  }
+
+  static final Log LOG = LogFactory
+      .getLog(TestRMContainerAllocator.class);
+  static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+
+  @After
+  public void tearDown() {
+    DefaultMetricsSystem.shutdown();
+  }
+
+  @Test
+  public void testSimple() throws Exception {
+
+    LOG.info("Running testSimple");
+
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
+            0, 0, 0, 0, 0, 0));
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    // add resources to scheduler
+    MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+    MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+    MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+    dispatcher.await();
+
+    // create the container request
+    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+        new String[] { "h1" });
+    allocator.sendRequest(event1);
+
+    // send 1 more request with different resource req
+    ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
+        new String[] { "h2" });
+    allocator.sendRequest(event2);
+
+    // this tells the scheduler about the requests
+    // as nodes are not added, no allocations
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    // send another request with different resource and priority
+    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
+        new String[] { "h3" });
+    allocator.sendRequest(event3);
+
+    // this tells the scheduler about the requests
+    // as nodes are not added, no allocations
+    assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    // update resources in scheduler
+    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+    nodeManager2.nodeHeartbeat(true); // Node heartbeat
+    nodeManager3.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    assigned = allocator.schedule();
+    dispatcher.await();
+    checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
+        assigned, false);
+  }
+
+  @Test
+  public void testResource() throws Exception {
+
+    LOG.info("Running testResource");
+
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
+            0, 0, 0, 0, 0, 0));
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    // add resources to scheduler
+    MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+    MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+    MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+    dispatcher.await();
+
+    // create the container request
+    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+        new String[] { "h1" });
+    allocator.sendRequest(event1);
+
+    // send 1 more request with different resource req
+    ContainerRequestEvent event2 = createReq(jobId, 2, 2048,
+        new String[] { "h2" });
+    allocator.sendRequest(event2);
+
+    // this tells the scheduler about the requests
+    // as nodes are not added, no allocations
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    // update resources in scheduler
+    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+    nodeManager2.nodeHeartbeat(true); // Node heartbeat
+    nodeManager3.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    assigned = allocator.schedule();
+    dispatcher.await();
+    checkAssignments(new ContainerRequestEvent[] { event1, event2 },
+        assigned, false);
+  }
+
+  @Test
+  public void testMapReduceScheduling() throws Exception {
+
+    LOG.info("Running testMapReduceScheduling");
+
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
+            0, 0, 0, 0, 0, 0));
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    // add resources to scheduler
+    MockNM nodeManager1 = rm.registerNode("h1:1234", 1024);
+    MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+    MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+    dispatcher.await();
+
+    // create the container request
+    // send MAP request
+    ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] {
+        "h1", "h2" }, true, false);
+    allocator.sendRequest(event1);
+
+    // send REDUCE request
+    ContainerRequestEvent event2 = createReq(jobId, 2, 3000,
+        new String[] { "h1" }, false, true);
+    allocator.sendRequest(event2);
+
+    // send MAP request
+    ContainerRequestEvent event3 = createReq(jobId, 3, 2048,
+        new String[] { "h3" }, false, false);
+    allocator.sendRequest(event3);
+
+    // this tells the scheduler about the requests
+    // as nodes are not added, no allocations
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    // update resources in scheduler
+    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+    nodeManager2.nodeHeartbeat(true); // Node heartbeat
+    nodeManager3.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+
+    assigned = allocator.schedule();
+    dispatcher.await();
+    checkAssignments(new ContainerRequestEvent[] { event1, event3 },
+        assigned, false);
+
+    // validate that no container is assigned to h1 as it doesn't have 2048
+    for (TaskAttemptContainerAssignedEvent assig : assigned) {
+      Assert.assertFalse("Assigned count not correct", "h1".equals(assig
+          .getContainer().getNodeId().getHost()));
+    }
+  }
+
+  private static class MyResourceManager extends MockRM {
+
+    public MyResourceManager(Configuration conf) {
+      super(conf);
+    }
+
+    @Override
+    protected Dispatcher createDispatcher() {
+      return new DrainDispatcher();
+    }
+
+    @Override
+    protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
+      // Dispatch inline for test sanity
+      return new EventHandler<SchedulerEvent>() {
+        @Override
+        public void handle(SchedulerEvent event) {
+          scheduler.handle(event);
+        }
+      };
+    }
+    @Override
+    protected ResourceScheduler createScheduler() {
+      return new MyFifoScheduler(getRMContext());
+    }
+  }
+
+  private static class FakeJob extends JobImpl {
+
+    public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf,
+        int numMaps, int numReduces) {
+      super(appAttemptID, conf, null, null, null, null, null, null, null,
+          null);
+      this.jobId = MRBuilderUtils
+          .newJobId(appAttemptID.getApplicationId(), 0);
+      this.numMaps = numMaps;
+      this.numReduces = numReduces;
+    }
+
+    private float setupProgress;
+    private float mapProgress;
+    private float reduceProgress;
+    private float cleanupProgress;
+    private final int numMaps;
+    private final int numReduces;
+    private JobId jobId;
+
+    void setProgress(float setupProgress, float mapProgress,
+        float reduceProgress, float cleanupProgress) {
+      this.setupProgress = setupProgress;
+      this.mapProgress = mapProgress;
+      this.reduceProgress = reduceProgress;
+      this.cleanupProgress = cleanupProgress;
+    }
+
+    @Override
+    public int getTotalMaps() { return this.numMaps; }
+    @Override
+    public int getTotalReduces() { return this.numReduces;}
+
+    @Override
+    public JobReport getReport() {
+      return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
+          JobState.RUNNING, 0, 0, this.setupProgress, this.mapProgress,
+          this.reduceProgress, this.cleanupProgress);
+    }
+  }
+
+  @Test
+  public void testReportedAppProgress() throws Exception {
+
+    LOG.info("Running testReportedAppProgress");
+
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    FakeJob job = new FakeJob(appAttemptId, conf, 2, 2);
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, job);
+
+    allocator.schedule(); // Send heartbeat
+    dispatcher.await();
+    Assert.assertEquals(0.0, app.getProgress(), 0.0);
+
+    job.setProgress(100, 10, 0, 0);
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(9.5f, app.getProgress(), 0.0);
+
+    job.setProgress(100, 80, 0, 0);
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(41.0f, app.getProgress(), 0.0);
+
+    job.setProgress(100, 100, 20, 0);
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(59.0f, app.getProgress(), 0.0);
+
+    job.setProgress(100, 100, 100, 100);
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(100.0f, app.getProgress(), 0.0);
+  }
+
+  @Test
+  public void testReportedAppProgressWithOnlyMaps() throws Exception {
+
+    LOG.info("Running testReportedAppProgressWithOnlyMaps");
+
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    FakeJob job = new FakeJob(appAttemptId, conf, 2, 0);
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, job);
+
+    allocator.schedule(); // Send heartbeat
+    dispatcher.await();
+    Assert.assertEquals(0.0, app.getProgress(), 0.0);
+
+    job.setProgress(100, 10, 0, 0);
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(14f, app.getProgress(), 0.0);
+
+    job.setProgress(100, 60, 0, 0);
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(59.0f, app.getProgress(), 0.0);
+
+    job.setProgress(100, 100, 0, 100);
+    allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(100.0f, app.getProgress(), 0.0);
+  }
+
+  private static class MyFifoScheduler extends FifoScheduler {
+
+    public MyFifoScheduler(RMContext rmContext) {
+      super();
+      try {
+        reinitialize(new Configuration(), new ContainerTokenSecretManager(),
+            rmContext);
+      } catch (IOException ie) {
+        LOG.info("add application failed with ", ie);
+        assert (false);
+      }
+    }
+
+    // override this to copy the objects otherwise FifoScheduler updates the
+    // numContainers in same objects as kept by RMContainerAllocator
+    @Override
+    public synchronized Allocation allocate(
+        ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
+        List<ContainerId> release) {
+      List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
+      for (ResourceRequest req : ask) {
+        ResourceRequest reqCopy = BuilderUtils.newResourceRequest(req
+            .getPriority(), req.getHostName(), req.getCapability(), req
+            .getNumContainers());
+        askCopy.add(reqCopy);
+      }
+      return super.allocate(applicationAttemptId, askCopy, release);
+    }
+  }
+
+  private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
+      int memory, String[] hosts) {
+    return createReq(jobId, taskAttemptId, memory, hosts, false, false);
+  }
+
+  private ContainerRequestEvent
+      createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts,
+          boolean earlierFailedAttempt, boolean reduce) {
+    TaskId taskId;
+    if (reduce) {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+    } else {
+      taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+    }
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+        taskAttemptId);
+    Resource containerNeed = BuilderUtils.newResource(memory);
+    if (earlierFailedAttempt) {
+      return ContainerRequestEvent
+          .createContainerRequestEventForFailedContainer(attemptId,
+              containerNeed);
+    }
+    return new ContainerRequestEvent(attemptId, containerNeed, hosts,
+        new String[] { NetworkTopology.DEFAULT_RACK });
+  }
+
+  private void checkAssignments(ContainerRequestEvent[] requests,
+      List<TaskAttemptContainerAssignedEvent> assignments,
+      boolean checkHostMatch) {
+    Assert.assertNotNull("Container not assigned", assignments);
+    Assert.assertEquals("Assigned count not correct", requests.length,
+        assignments.size());
+
+    // check for uniqueness of containerIDs
+    Set<ContainerId> containerIds = new HashSet<ContainerId>();
+    for (TaskAttemptContainerAssignedEvent assigned : assignments) {
+      containerIds.add(assigned.getContainer().getId());
+    }
+    Assert.assertEquals("Assigned containers must be different", assignments
+        .size(), containerIds.size());
+
+    // check for all assignment
+    for (ContainerRequestEvent req : requests) {
+      TaskAttemptContainerAssignedEvent assigned = null;
+      for (TaskAttemptContainerAssignedEvent ass : assignments) {
+        if (ass.getTaskAttemptID().equals(req.getAttemptID())) {
+          assigned = ass;
+          break;
+        }
+      }
+      checkAssignment(req, assigned, checkHostMatch);
+    }
+  }
+
+  private void checkAssignment(ContainerRequestEvent request,
+      TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) {
+    Assert.assertNotNull("Nothing assigned to attempt "
+        + request.getAttemptID(), assigned);
+    Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(),
+        assigned.getTaskAttemptID());
+    if (checkHostMatch) {
+      Assert.assertTrue("Not assigned to requested host", Arrays.asList(
+          request.getHosts()).contains(
+          assigned.getContainer().getNodeId().toString()));
+    }
+  }
+
+  // Mock RMContainerAllocator
+  // Instead of talking to remote Scheduler,uses the local Scheduler
+  private static class MyContainerAllocator extends RMContainerAllocator {
+    static final List<TaskAttemptContainerAssignedEvent> events
+      = new ArrayList<TaskAttemptContainerAssignedEvent>();
+
+    private MyResourceManager rm;
+
+    @SuppressWarnings("rawtypes")
+    private static AppContext createAppContext(
+        ApplicationAttemptId appAttemptId, Job job) {
+      AppContext context = mock(AppContext.class);
+      ApplicationId appId = appAttemptId.getApplicationId();
+      when(context.getApplicationID()).thenReturn(appId);
+      when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
+      when(context.getJob(isA(JobId.class))).thenReturn(job);
+      when(context.getEventHandler()).thenReturn(new EventHandler() {
+        @Override
+        public void handle(Event event) {
+          // Only capture interesting events.
+          if (event instanceof TaskAttemptContainerAssignedEvent) {
+            events.add((TaskAttemptContainerAssignedEvent) event);
+          }
+        }
+      });
+      return context;
+    }
+
+    private static ClientService createMockClientService() {
+      ClientService service = mock(ClientService.class);
+      when(service.getBindAddress()).thenReturn(
+          NetUtils.createSocketAddr("localhost:4567"));
+      when(service.getHttpPort()).thenReturn(890);
+      return service;
+    }
+
+    MyContainerAllocator(MyResourceManager rm, Configuration conf,
+        ApplicationAttemptId appAttemptId, Job job) {
+      super(createMockClientService(), createAppContext(appAttemptId, job));
+      this.rm = rm;
+      super.init(conf);
+      super.start();
+    }
+
+    @Override
+    protected AMRMProtocol createSchedulerProxy() {
+      return this.rm.getApplicationMasterService();
+    }
+
+    @Override
+    protected void register() {
+      super.register();
+    }
+
+    @Override
+    protected void unregister() {
+    }
+
+    @Override
+    protected Resource getMinContainerCapability() {
+      return BuilderUtils.newResource(1024);
+    }
+
+    @Override
+    protected Resource getMaxContainerCapability() {
+      return BuilderUtils.newResource(10240);
+    }
+
+    public void sendRequest(ContainerRequestEvent req) {
+      sendRequests(Arrays.asList(new ContainerRequestEvent[] { req }));
+    }
+
+    public void sendRequests(List<ContainerRequestEvent> reqs) {
+      for (ContainerRequestEvent req : reqs) {
+        super.handle(req);
+      }
+    }
+
+    // API to be used by tests
+    public List<TaskAttemptContainerAssignedEvent> schedule() {
+      // run the scheduler
+      try {
+        super.heartbeat();
+      } catch (Exception e) {
+        LOG.error("error in heartbeat ", e);
+        throw new YarnException(e);
+      }
+
+      List<TaskAttemptContainerAssignedEvent> result
+        = new ArrayList<TaskAttemptContainerAssignedEvent>(events);
+      events.clear();
+      return result;
+    }
+
+    protected void startAllocatorThread() {
+      // override to NOT start thread
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestRMContainerAllocator t = new TestRMContainerAllocator();
+    t.testSimple();
+    t.testResource();
+    t.testMapReduceScheduling();
+    t.testReportedAppProgress();
+    t.testReportedAppProgressWithOnlyMaps();
+  }
 }

+ 24 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java

@@ -19,27 +19,25 @@
 package org.apache.hadoop.mapreduce.v2.util;
 
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Records;
 
 public class MRBuilderUtils {
 
-  private static final RecordFactory recordFactory = RecordFactoryProvider
-      .getRecordFactory(null);
-
   public static JobId newJobId(ApplicationId appId, int id) {
-    JobId jobId = recordFactory.newRecordInstance(JobId.class);
+    JobId jobId = Records.newRecord(JobId.class);
     jobId.setAppId(appId);
     jobId.setId(id);
     return jobId;
   }
 
   public static TaskId newTaskId(JobId jobId, int id, TaskType taskType) {
-    TaskId taskId = recordFactory.newRecordInstance(TaskId.class);
+    TaskId taskId = Records.newRecord(TaskId.class);
     taskId.setJobId(jobId);
     taskId.setId(id);
     taskId.setTaskType(taskType);
@@ -48,9 +46,27 @@ public class MRBuilderUtils {
 
   public static TaskAttemptId newTaskAttemptId(TaskId taskId, int attemptId) {
     TaskAttemptId taskAttemptId =
-        recordFactory.newRecordInstance(TaskAttemptId.class);
+        Records.newRecord(TaskAttemptId.class);
     taskAttemptId.setTaskId(taskId);
     taskAttemptId.setId(attemptId);
     return taskAttemptId;
   }
+
+  public static JobReport newJobReport(JobId jobId, String jobName,
+      String userName, JobState state, long startTime, long finishTime,
+      float setupProgress, float mapProgress, float reduceProgress,
+      float cleanupProgress) {
+    JobReport report = Records.newRecord(JobReport.class);
+    report.setJobId(jobId);
+    report.setJobName(jobName);
+    report.setUser(userName);
+    report.setJobState(state);
+    report.setStartTime(startTime);
+    report.setFinishTime(finishTime);
+    report.setSetupProgress(setupProgress);
+    report.setCleanupProgress(cleanupProgress);
+    report.setMapProgress(mapProgress);
+    report.setReduceProgress(reduceProgress);
+    return report;
+  }
 }

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml

@@ -64,6 +64,12 @@
       <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-common</artifactId>

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml

@@ -88,6 +88,12 @@
         <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
         <version>${yarn.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+        <version>${yarn.version}</version>
+        <type>test-jar</type> 
+      </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-mapreduce-client-core</artifactId>

+ 23 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java

@@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.util;
 
 import java.net.URI;
 import java.util.Comparator;
+import java.util.List;
 
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -184,6 +186,13 @@ public class BuilderUtils {
     return id;
   }
 
+  public static NodeId newNodeId(String host, int port) {
+    NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
+    nodeId.setHost(host);
+    nodeId.setPort(port);
+    return nodeId;
+  }
+
   public static Container newContainer(RecordFactory recordFactory,
       ApplicationAttemptId appAttemptId, int containerId, NodeId nodeId,
       String nodeHttpAddress, Resource resource, Priority priority) {
@@ -266,5 +275,18 @@ public class BuilderUtils {
     url.setFile(file);
     return url;
   }
-  
+
+  public static AllocateRequest newAllocateRequest(
+      ApplicationAttemptId applicationAttemptId, int responseID,
+      float appProgress, List<ResourceRequest> resourceAsk,
+      List<ContainerId> containersToBeReleased) {
+    AllocateRequest allocateRequest = recordFactory
+        .newRecordInstance(AllocateRequest.class);
+    allocateRequest.setApplicationAttemptId(applicationAttemptId);
+    allocateRequest.setResponseId(responseID);
+    allocateRequest.setProgress(appProgress);
+    allocateRequest.addAllAsks(resourceAsk);
+    allocateRequest.addAllReleases(containersToBeReleased);
+    return allocateRequest;
+  }
 }

+ 14 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml

@@ -37,6 +37,20 @@
 
   <build>
     <plugins>
+
+      <!-- Publish tests jar -->
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>test-compile</phase>
+          </execution>
+        </executions>
+      </plugin>
+
       <plugin>
         <artifactId>maven-antrun-plugin</artifactId>
         <executions>

+ 5 - 7
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@@ -250,13 +251,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
 
       if (rmContext.getRMApps().putIfAbsent(applicationId, application) != 
           null) {
-        LOG.info("Application with id " + applicationId + 
-            " is already present! Cannot add a duplicate!");
-        // don't send event through dispatcher as it will be handled by app 
-        // already present with this id.
-        application.handle(new RMAppRejectedEvent(applicationId,
-            "Application with this id is already present! " +
-            "Cannot add a duplicate!"));
+        String message = "Application with id " + applicationId
+            + " is already present! Cannot add a duplicate!";
+        LOG.info(message);
+        throw RPCUtil.getRemoteException(message);
       } else {
         this.rmContext.getDispatcher().getEventHandler().handle(
             new RMAppEvent(applicationId, RMAppEventType.START));

+ 12 - 4
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -98,7 +98,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   private ContainerAllocationExpirer containerAllocationExpirer;
   protected NMLivelinessMonitor nmLivelinessMonitor;
   protected NodesListManager nodesListManager;
-  private SchedulerEventDispatcher schedulerDispatcher;
+  private EventHandler<SchedulerEvent> schedulerDispatcher;
   protected RMAppManager rmAppManager;
 
   private WebApp webApp;
@@ -119,7 +119,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   @Override
   public synchronized void init(Configuration conf) {
 
-    this.rmDispatcher = new AsyncDispatcher();
+    this.rmDispatcher = createDispatcher();
     addIfService(this.rmDispatcher);
 
     this.containerAllocationExpirer = new ContainerAllocationExpirer(
@@ -138,8 +138,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
     this.conf = new YarnConfiguration(conf);
     // Initialize the scheduler
     this.scheduler = createScheduler();
-    this.schedulerDispatcher = new SchedulerEventDispatcher(this.scheduler);
-    addService(this.schedulerDispatcher);
+    this.schedulerDispatcher = createSchedulerEventDispatcher();
+    addIfService(this.schedulerDispatcher);
     this.rmDispatcher.register(SchedulerEventType.class,
         this.schedulerDispatcher);
 
@@ -195,6 +195,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
     super.init(conf);
   }
 
+  protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
+    return new SchedulerEventDispatcher(this.scheduler);
+  }
+
+  protected Dispatcher createDispatcher() {
+    return new AsyncDispatcher();
+  }
+
   protected void addIfService(Object object) {
     if (object instanceof Service) {
       addService((Service) object);

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java

@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
  * look at {@link RMAppImpl} for its implementation. This interface 
  * exposes methods to access various updates in application status/report.
  */
-public interface RMApp extends EventHandler<RMAppEvent>{
+public interface RMApp extends EventHandler<RMAppEvent> {
 
   /**
    * The application id for this {@link RMApp}.

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java

@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
  * {@link YarnConfiguration#RM_AM_MAX_RETRIES}. For specific 
  * implementation take a look at {@link RMAppAttemptImpl}.
  */
-public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent>{
+public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
 
   /**
    * Get the application attempt id for this {@link RMAppAttempt}.

+ 2 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -685,6 +685,8 @@ public class RMAppAttemptImpl implements RMAppAttempt {
     public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
 
+      appAttempt.progress = 1.0f;
+
       // Tell the app and the scheduler
       super.transition(appAttempt, event);
 

+ 7 - 8
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java

@@ -207,13 +207,18 @@ public class SchedulerApp {
         .getDispatcher().getEventHandler(), this.rmContext
         .getContainerAllocationExpirer());
 
+    // Add it to allContainers list.
+    newlyAllocatedContainers.add(rmContainer);
+    liveContainers.put(container.getId(), rmContainer);    
+
     // Update consumption and track allocations
-    
+    appSchedulingInfo.allocate(type, node, priority, request, container);
+    Resources.addTo(currentConsumption, container.getResource());
+
     // Inform the container
     rmContainer.handle(
         new RMContainerEvent(container.getId(), RMContainerEventType.START));
 
-    Resources.addTo(currentConsumption, container.getResource());
     if (LOG.isDebugEnabled()) {
       LOG.debug("allocate: applicationAttemptId=" 
           + container.getId().getApplicationAttemptId() 
@@ -223,12 +228,6 @@ public class SchedulerApp {
     RMAuditLogger.logSuccess(getUser(), 
         AuditConstants.ALLOC_CONTAINER, "SchedulerApp", 
         getApplicationId(), container.getId());
-
-    // Add it to allContainers list.
-    newlyAllocatedContainers.add(rmContainer);
-    liveContainers.put(container.getId(), rmContainer);
-    
-    appSchedulingInfo.allocate(type, node, priority, request, container);
     
     return rmContainer;
   }

+ 0 - 3
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java

@@ -19,10 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
 
 import java.util.List;
-import java.util.Map;
 
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 

+ 2 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -291,7 +291,7 @@ public class FifoScheduler implements ResourceScheduler {
   
   @SuppressWarnings("unchecked")
   private synchronized void addApplication(ApplicationAttemptId appAttemptId,
-      String queueName, String user) {
+      String user) {
     // TODO: Fix store
     SchedulerApp schedulerApp = 
         new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, 
@@ -628,7 +628,7 @@ public class FifoScheduler implements ResourceScheduler {
     {
       AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
       addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
-          .getQueue(), appAddedEvent.getUser());
+          .getUser());
     }
     break;
     case APP_REMOVED:

+ 4 - 6
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 
 public class MockAM {
@@ -128,7 +129,7 @@ public class MockAM {
     req.setHostName(resource);
     req.setNumContainers(containers);
     Priority pri = Records.newRecord(Priority.class);
-    pri.setPriority(1);
+    pri.setPriority(priority);
     req.setPriority(pri);
     Resource capability = Records.newRecord(Resource.class);
     capability.setMemory(memory);
@@ -139,11 +140,8 @@ public class MockAM {
   public AMResponse allocate(
       List<ResourceRequest> resourceRequest, List<ContainerId> releases) 
       throws Exception {
-    AllocateRequest req = Records.newRecord(AllocateRequest.class);
-    req.setResponseId(++responseId);
-    req.setApplicationAttemptId(attemptId);
-    req.addAllAsks(resourceRequest);
-    req.addAllReleases(releases);
+    AllocateRequest req = BuilderUtils.newAllocateRequest(attemptId,
+        ++responseId, 0F, resourceRequest, releases);
     AllocateResponse resp = amRMProtocol.allocate(req);
     return resp.getAMResponse();
   }

+ 8 - 5
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -77,13 +78,14 @@ public class TestAMRMRPCResponseId {
 
     am.registerAppAttempt();
     
-    AllocateRequest allocateRequest = recordFactory.newRecordInstance(AllocateRequest.class);
-    allocateRequest.setApplicationAttemptId(attempt.getAppAttemptId());
+    AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(attempt
+        .getAppAttemptId(), 0, 0F, null, null);
 
     AMResponse response = amService.allocate(allocateRequest).getAMResponse();
     Assert.assertEquals(1, response.getResponseId());
     Assert.assertFalse(response.getReboot());
-    allocateRequest.setResponseId(response.getResponseId());
+    allocateRequest = BuilderUtils.newAllocateRequest(attempt
+        .getAppAttemptId(), response.getResponseId(), 0F, null, null);
     
     response = amService.allocate(allocateRequest).getAMResponse();
     Assert.assertEquals(2, response.getResponseId());
@@ -91,8 +93,9 @@ public class TestAMRMRPCResponseId {
     response = amService.allocate(allocateRequest).getAMResponse();
     Assert.assertEquals(2, response.getResponseId());
     
-    /** try sending old **/
-    allocateRequest.setResponseId(0);
+    /** try sending old request again **/
+    allocateRequest = BuilderUtils.newAllocateRequest(attempt
+        .getAppAttemptId(), 0, 0F, null, null);
     response = amService.allocate(allocateRequest).getAMResponse();
     Assert.assertTrue(response.getReboot());
   }

+ 1 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java

@@ -162,6 +162,7 @@ public class MockRMApp implements RMApp {
     this.diagnostics  = new StringBuilder(diag);
   }
 
+  @Override
   public void handle(RMAppEvent event) {
   }
 

+ 3 - 6
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java

@@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.BeforeClass;
 import org.junit.AfterClass;
@@ -240,12 +241,8 @@ public class TestContainerTokenSecretManager {
     ask.add(rr);
     ArrayList<ContainerId> release = new ArrayList<ContainerId>();
     
-    AllocateRequest allocateRequest =
-        recordFactory.newRecordInstance(AllocateRequest.class);
-    allocateRequest.setApplicationAttemptId(appAttempt.getAppAttemptId());
-    allocateRequest.setResponseId(0);
-    allocateRequest.addAllAsks(ask);
-    allocateRequest.addAllReleases(release);
+    AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
+        appAttempt.getAppAttemptId(), 0, 0F, ask, release);
     List<Container> allocatedContainers = scheduler.allocate(allocateRequest)
         .getAMResponse().getAllocatedContainers();