Browse Source

Merge r1469042 through r1469643 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1469669 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 years ago
parent
commit
b10f1d36d6
53 changed files with 1223 additions and 472 deletions
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 12 12
      hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.cmd
  3. 12 9
      hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.sh
  4. 15 0
      hadoop-mapreduce-project/CHANGES.txt
  5. 48 51
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  6. 2 6
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
  7. 16 17
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
  8. 40 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
  9. 2 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
  10. 9 5
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
  11. 1 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
  12. 4 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
  13. 17 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
  14. 9 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
  15. 90 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
  16. 79 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputFormat.java
  17. 2 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
  18. 15 0
      hadoop-yarn-project/CHANGES.txt
  19. 8 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/YarnApplicationState.java
  20. 7 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
  21. 0 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
  22. 0 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
  23. 6 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
  24. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
  25. 49 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
  26. 35 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java
  27. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
  28. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
  29. 44 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  30. 9 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
  31. 36 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppStoredEvent.java
  32. 10 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
  33. 23 36
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
  34. 35 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
  35. 32 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
  36. 7 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
  37. 14 29
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  38. 29 25
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
  39. 2 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
  40. 0 118
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java
  41. 145 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
  42. 9 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
  43. 8 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
  44. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
  45. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
  46. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
  47. 108 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
  48. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
  49. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
  50. 93 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
  51. 0 59
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java
  52. 109 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
  53. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -546,6 +546,9 @@ Release 2.0.5-beta - UNRELEASED
     HADOOP-9401. CodecPool: Add counters for number of (de)compressors 
     leased out. (kkambatl via tucu)
 
+    HADOOP-9450. HADOOP_USER_CLASSPATH_FIRST is not honored; CLASSPATH
+    is PREpended instead of APpended. (Chris Nauroth and harsh via harsh)
+
   OPTIMIZATIONS
 
     HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs

+ 12 - 12
hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.cmd

@@ -145,18 +145,6 @@ if exist %HADOOP_COMMON_HOME%\%HADOOP_COMMON_LIB_JARS_DIR% (
 
 set CLASSPATH=!CLASSPATH!;%HADOOP_COMMON_HOME%\%HADOOP_COMMON_DIR%\*
 
-@rem
-@rem add user-specified CLASSPATH last
-@rem
-
-if defined HADOOP_CLASSPATH (
-  if defined HADOOP_USER_CLASSPATH_FIRST (
-    set CLASSPATH=%HADOOP_CLASSPATH%;%CLASSPATH%;
-  ) else (
-    set CLASSPATH=%CLASSPATH%;%HADOOP_CLASSPATH%;
-  )
-)
-
 @rem
 @rem default log directory % file
 @rem
@@ -289,4 +277,16 @@ if not "%HADOOP_MAPRED_HOME%\%MAPRED_DIR%" == "%HADOOP_YARN_HOME%\%YARN_DIR%" (
   set CLASSPATH=!CLASSPATH!;%HADOOP_MAPRED_HOME%\%MAPRED_DIR%\*
 )
 
+@rem
+@rem add user-specified CLASSPATH last
+@rem
+
+if defined HADOOP_CLASSPATH (
+  if defined HADOOP_USER_CLASSPATH_FIRST (
+    set CLASSPATH=%HADOOP_CLASSPATH%;%CLASSPATH%;
+  ) else (
+    set CLASSPATH=%CLASSPATH%;%HADOOP_CLASSPATH%;
+  )
+)
+
 :eof

+ 12 - 9
hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.sh

@@ -158,10 +158,6 @@ fi
 # CLASSPATH initially contains $HADOOP_CONF_DIR
 CLASSPATH="${HADOOP_CONF_DIR}"
 
-if [ "$HADOOP_USER_CLASSPATH_FIRST" != "" ] && [ "$HADOOP_CLASSPATH" != "" ] ; then
-  CLASSPATH=${CLASSPATH}:${HADOOP_CLASSPATH}
-fi
-
 # so that filenames w/ spaces are handled correctly in loops below
 IFS=
 
@@ -182,11 +178,6 @@ fi
 
 CLASSPATH=${CLASSPATH}:$HADOOP_COMMON_HOME/$HADOOP_COMMON_DIR'/*'
 
-# add user-specified CLASSPATH last
-if [ "$HADOOP_USER_CLASSPATH_FIRST" = "" ] && [ "$HADOOP_CLASSPATH" != "" ]; then
-  CLASSPATH=${CLASSPATH}:${HADOOP_CLASSPATH}
-fi
-
 # default log directory & file
 if [ "$HADOOP_LOG_DIR" = "" ]; then
   HADOOP_LOG_DIR="$HADOOP_PREFIX/logs"
@@ -285,3 +276,15 @@ if [ "$HADOOP_MAPRED_HOME/$MAPRED_DIR" != "$HADOOP_YARN_HOME/$YARN_DIR" ] ; then
 
   CLASSPATH=${CLASSPATH}:$HADOOP_MAPRED_HOME/$MAPRED_DIR'/*'
 fi
+
+# Add the user-specified CLASSPATH via HADOOP_CLASSPATH
+# Add it first or last depending on if user has
+# set env-var HADOOP_USER_CLASSPATH_FIRST
+if [ "$HADOOP_CLASSPATH" != "" ]; then
+  # Prefix it if its to be preceded
+  if [ "$HADOOP_USER_CLASSPATH_FIRST" != "" ]; then
+    CLASSPATH=${HADOOP_CLASSPATH}:${CLASSPATH}
+  else
+    CLASSPATH=${CLASSPATH}:${HADOOP_CLASSPATH}
+  fi
+fi

+ 15 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -198,6 +198,10 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-4985. Add compression option to TestDFSIO usage.
     (Plamen Jeliazkov via shv)
 
+    MAPREDUCE-5152. Make MR App to simply pass through the container from RM
+    instead of extracting and populating information itself to start any
+    container. (vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4974. Optimising the LineRecordReader initialize() method 
@@ -305,6 +309,17 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5151. Update MR AM to use standard exit codes from the API after
     YARN-444. (Sandy Ryza via vinodkv)
 
+    MAPREDUCE-5140. MR part of YARN-514 (Zhijie Shen via bikas)
+
+    MAPREDUCE-5128. mapred-default.xml is missing a bunch of history server 
+    configs. (sandyr via tucu)
+
+    MAPREDUCE-4898. FileOutputFormat.checkOutputSpecs and 
+    FileOutputFormat.setOutputPath incompatible with MR1. (rkanter via tucu)
+
+    MAPREDUCE-4932. mapreduce.job#getTaskCompletionEvents incompatible with 
+    Hadoop 1. (rkanter via tucu)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 48 - 51
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -117,7 +117,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -490,14 +489,10 @@ public abstract class TaskAttemptImpl implements
          <TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
     stateMachine;
 
-  private ContainerId containerID;
-  private NodeId containerNodeId;
-  private String containerMgrAddress;
-  private String nodeHttpAddress;
+  @VisibleForTesting
+  public Container container;
   private String nodeRackName;
   private WrappedJvmID jvmID;
-  private ContainerToken containerToken;
-  private Resource assignedCapability;
   
   //this takes good amount of memory ~ 30KB. Instantiate it lazily
   //and make it null once task is launched.
@@ -825,7 +820,7 @@ public abstract class TaskAttemptImpl implements
   public ContainerId getAssignedContainerID() {
     readLock.lock();
     try {
-      return containerID;
+      return container == null ? null : container.getId();
     } finally {
       readLock.unlock();
     }
@@ -835,7 +830,8 @@ public abstract class TaskAttemptImpl implements
   public String getAssignedContainerMgrAddress() {
     readLock.lock();
     try {
-      return containerMgrAddress;
+      return container == null ? null : StringInterner.weakIntern(container
+        .getNodeId().toString());
     } finally {
       readLock.unlock();
     }
@@ -895,7 +891,7 @@ public abstract class TaskAttemptImpl implements
   public NodeId getNodeId() {
     readLock.lock();
     try {
-      return containerNodeId;
+      return container == null ? null : container.getNodeId();
     } finally {
       readLock.unlock();
     }
@@ -907,7 +903,7 @@ public abstract class TaskAttemptImpl implements
   public String getNodeHttpAddress() {
     readLock.lock();
     try {
-      return nodeHttpAddress;
+      return container == null ? null : container.getNodeHttpAddress();
     } finally {
       readLock.unlock();
     }
@@ -967,8 +963,8 @@ public abstract class TaskAttemptImpl implements
       result.setContainerId(this.getAssignedContainerID());
       result.setNodeManagerHost(trackerName);
       result.setNodeManagerHttpPort(httpPort);
-      if (this.containerNodeId != null) {
-        result.setNodeManagerPort(this.containerNodeId.getPort());
+      if (this.container != null) {
+        result.setNodeManagerPort(this.container.getNodeId().getPort());
       }
       return result;
     } finally {
@@ -1093,13 +1089,17 @@ public abstract class TaskAttemptImpl implements
   @SuppressWarnings("unchecked")
   public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo,
       OutputCommitter committer, boolean recoverOutput) {
-    containerID = taInfo.getContainerId();
-    containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":"
+    ContainerId containerId = taInfo.getContainerId();
+    NodeId containerNodeId = ConverterUtils.toNodeId(taInfo.getHostname() + ":"
         + taInfo.getPort());
-    containerMgrAddress = StringInterner.weakIntern(
-        containerNodeId.toString());
-    nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
+    String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
         + taInfo.getHttpPort());
+    // Resource/Priority/Tokens are only needed while launching the
+    // container on an NM, these are already completed tasks, so setting them to
+    // null
+    container =
+        BuilderUtils.newContainer(containerId, containerNodeId,
+          nodeHttpAddress, null, null, null);
     computeRackAndLocality();
     launchTime = taInfo.getStartTime();
     finishTime = (taInfo.getFinishTime() != -1) ?
@@ -1227,6 +1227,7 @@ public abstract class TaskAttemptImpl implements
   }
 
   private void computeRackAndLocality() {
+    NodeId containerNodeId = container.getNodeId();
     nodeRackName = RackResolver.resolve(
         containerNodeId.getHost()).getNetworkLocation();
 
@@ -1331,10 +1332,10 @@ public abstract class TaskAttemptImpl implements
             TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId()
                 .getTaskType()), attemptState.toString(),
             taskAttempt.finishTime,
-            taskAttempt.containerNodeId == null ? "UNKNOWN"
-                : taskAttempt.containerNodeId.getHost(),
-            taskAttempt.containerNodeId == null ? -1 
-                : taskAttempt.containerNodeId.getPort(),    
+            taskAttempt.container == null ? "UNKNOWN"
+                : taskAttempt.container.getNodeId().getHost(),
+            taskAttempt.container == null ? -1 
+                : taskAttempt.container.getNodeId().getPort(),    
             taskAttempt.nodeRackName == null ? "UNKNOWN" 
                 : taskAttempt.nodeRackName,
             StringUtils.join(
@@ -1353,12 +1354,12 @@ public abstract class TaskAttemptImpl implements
     eventHandler.handle(jce);
 
     LOG.info("TaskAttempt: [" + attemptId
-        + "] using containerId: [" + containerID + " on NM: ["
-        + containerMgrAddress + "]");
+        + "] using containerId: [" + container.getId() + " on NM: ["
+        + StringInterner.weakIntern(container.getNodeId().toString()) + "]");
     TaskAttemptStartedEvent tase =
       new TaskAttemptStartedEvent(TypeConverter.fromYarn(attemptId),
           TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
-          launchTime, trackerName, httpPort, shufflePort, containerID,
+          launchTime, trackerName, httpPort, shufflePort, container.getId(),
           locality.toString(), avataar.toString());
     eventHandler.handle(
         new JobHistoryEvent(attemptId.getTaskId().getJobId(), tase));
@@ -1490,19 +1491,14 @@ public abstract class TaskAttemptImpl implements
         TaskAttemptEvent event) {
       final TaskAttemptContainerAssignedEvent cEvent = 
         (TaskAttemptContainerAssignedEvent) event;
-      taskAttempt.containerID = cEvent.getContainer().getId();
-      taskAttempt.containerNodeId = cEvent.getContainer().getNodeId();
-      taskAttempt.containerMgrAddress = StringInterner.weakIntern(
-          taskAttempt.containerNodeId.toString());
-      taskAttempt.nodeHttpAddress = StringInterner.weakIntern(
-          cEvent.getContainer().getNodeHttpAddress());
-      taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
-      taskAttempt.assignedCapability = cEvent.getContainer().getResource();
+      Container container = cEvent.getContainer();
+      taskAttempt.container = container;
       // this is a _real_ Task (classic Hadoop mapred flavor):
       taskAttempt.remoteTask = taskAttempt.createRemoteTask();
-      taskAttempt.jvmID = new WrappedJvmID(
-          taskAttempt.remoteTask.getTaskID().getJobID(), 
-          taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
+      taskAttempt.jvmID =
+          new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(),
+            taskAttempt.remoteTask.isMapTask(), taskAttempt.container.getId()
+              .getId());
       taskAttempt.taskAttemptListener.registerPendingTask(
           taskAttempt.remoteTask, taskAttempt.jvmID);
 
@@ -1514,10 +1510,9 @@ public abstract class TaskAttemptImpl implements
           cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
           taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
           taskAttempt.taskAttemptListener, taskAttempt.credentials);
-      taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(
-          taskAttempt.attemptId, taskAttempt.containerID,
-          taskAttempt.containerMgrAddress, taskAttempt.containerToken,
-          launchContext, taskAttempt.assignedCapability, taskAttempt.remoteTask));
+      taskAttempt.eventHandler
+        .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
+          launchContext, container, taskAttempt.remoteTask));
 
       // send event to speculator that our container needs are satisfied
       taskAttempt.eventHandler.handle
@@ -1604,9 +1599,8 @@ public abstract class TaskAttemptImpl implements
       taskAttempt.taskAttemptListener
         .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
       //TODO Resolve to host / IP in case of a local address.
-      InetSocketAddress nodeHttpInetAddr =
-          NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
-                                                                  // Costly?
+      InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
+          NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
       taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
       taskAttempt.httpPort = nodeHttpInetAddr.getPort();
       taskAttempt.sendLaunchedEvents();
@@ -1713,6 +1707,10 @@ public abstract class TaskAttemptImpl implements
   private void logAttemptFinishedEvent(TaskAttemptStateInternal state) {
     //Log finished events only if an attempt started.
     if (getLaunchTime() == 0) return; 
+    String containerHostName = this.container == null ? "UNKNOWN"
+         : this.container.getNodeId().getHost();
+    int containerNodePort =
+        this.container == null ? -1 : this.container.getNodeId().getPort();
     if (attemptId.getTaskId().getTaskType() == TaskType.MAP) {
       MapAttemptFinishedEvent mfe =
          new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
@@ -1720,9 +1718,8 @@ public abstract class TaskAttemptImpl implements
          state.toString(),
          this.reportedStatus.mapFinishTime,
          finishTime,
-         this.containerNodeId == null ? "UNKNOWN"
-             : this.containerNodeId.getHost(),
-         this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
+         containerHostName,
+         containerNodePort,
          this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
          this.reportedStatus.stateString,
          getCounters(),
@@ -1737,9 +1734,8 @@ public abstract class TaskAttemptImpl implements
          this.reportedStatus.shuffleFinishTime,
          this.reportedStatus.sortFinishTime,
          finishTime,
-         this.containerNodeId == null ? "UNKNOWN"
-             : this.containerNodeId.getHost(),
-         this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
+         containerHostName,
+         containerNodePort,
          this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
          this.reportedStatus.stateString,
          getCounters(),
@@ -1864,8 +1860,9 @@ public abstract class TaskAttemptImpl implements
       //send the cleanup event to containerLauncher
       taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
           taskAttempt.attemptId, 
-          taskAttempt.containerID, taskAttempt.containerMgrAddress,
-          taskAttempt.containerToken,
+          taskAttempt.container.getId(), StringInterner
+              .weakIntern(taskAttempt.container.getNodeId().toString()),
+          taskAttempt.container.getContainerToken(),
           ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
     }
   }

+ 2 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java

@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.api.records.ContainerToken;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -149,16 +148,13 @@ public class ContainerLauncherImpl extends AbstractService implements
 
         // Construct the actual Container
         ContainerLaunchContext containerLaunchContext =
-          event.getContainer();
+          event.getContainerLaunchContext();
 
-        org.apache.hadoop.yarn.api.records.Container container =
-            BuilderUtils.newContainer(containerID, null, null,
-                event.getResource(), null, containerToken);
         // Now launch the actual container
         StartContainerRequest startRequest = Records
           .newRecord(StartContainerRequest.class);
         startRequest.setContainerLaunchContext(containerLaunchContext);
-        startRequest.setContainer(container);
+        startRequest.setContainer(event.getAllocatedContainer());
         StartContainerResponse response = proxy.startContainer(startRequest);
 
         ByteBuffer portInfo = response

+ 16 - 17
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java

@@ -20,35 +20,34 @@ package org.apache.hadoop.mapreduce.v2.app.launcher;
 
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerToken;
-import org.apache.hadoop.yarn.api.records.Resource;
 
 public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
 
-  private final ContainerLaunchContext container;
+  private final Container allocatedContainer;
+  private final ContainerLaunchContext containerLaunchContext;
   private final Task task;
-  private final Resource resource;
 
   public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
-      ContainerId containerID, String containerMgrAddress,
-      ContainerToken containerToken,
-      ContainerLaunchContext containerLaunchContext, Resource resource,
-      Task remoteTask) {
-    super(taskAttemptID, containerID, containerMgrAddress, containerToken,
-        ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
-    this.container = containerLaunchContext;
+      ContainerLaunchContext containerLaunchContext,
+      Container allocatedContainer, Task remoteTask) {
+    super(taskAttemptID, allocatedContainer.getId(), StringInterner
+      .weakIntern(allocatedContainer.getNodeId().toString()),
+      allocatedContainer.getContainerToken(),
+      ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
+    this.allocatedContainer = allocatedContainer;
+    this.containerLaunchContext = containerLaunchContext;
     this.task = remoteTask;
-    this.resource = resource;
   }
 
-  public ContainerLaunchContext getContainer() {
-    return this.container;
+  public ContainerLaunchContext getContainerLaunchContext() {
+    return this.containerLaunchContext;
   }
 
-  public Resource getResource() {
-    return this.resource;
+  public Container getAllocatedContainer() {
+    return this.allocatedContainer;
   }
 
   public Task getRemoteTask() {

+ 40 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java

@@ -23,6 +23,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 
 import junit.framework.Assert;
@@ -46,6 +47,11 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -411,7 +417,40 @@ public class TestMRApp {
       TypeConverter.fromYarn(state);
     }
   }
-  
+
+  private Container containerObtainedByContainerLauncher;
+  @Test
+  public void testContainerPassThrough() throws Exception {
+    MRApp app = new MRApp(0, 1, true, this.getClass().getName(), true) {
+      @Override
+      protected ContainerLauncher createContainerLauncher(AppContext context) {
+        return new MockContainerLauncher() {
+          @Override
+          public void handle(ContainerLauncherEvent event) {
+            if (event instanceof ContainerRemoteLaunchEvent) {
+              containerObtainedByContainerLauncher =
+                  ((ContainerRemoteLaunchEvent) event).getAllocatedContainer();
+            }
+            super.handle(event);
+          }
+        };
+      };
+    };
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+
+    Collection<Task> tasks = job.getTasks().values();
+    Collection<TaskAttempt> taskAttempts =
+        tasks.iterator().next().getAttempts().values();
+    TaskAttemptImpl taskAttempt =
+        (TaskAttemptImpl) taskAttempts.iterator().next();
+    // Container from RM should pass through to the launcher. Container object
+    // should be the same.
+   Assert.assertTrue(taskAttempt.container 
+     == containerObtainedByContainerLauncher);
+  }
+
   private final class MRAppWithHistory extends MRApp {
     public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
         String testName, boolean cleanOnStart, int startCount) {

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java

@@ -79,7 +79,8 @@ public class TestMapReduceChildJVM {
         public void handle(ContainerLauncherEvent event) {
           if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
             ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event;
-            ContainerLaunchContext launchContext = launchEvent.getContainer();
+            ContainerLaunchContext launchContext =
+                launchEvent.getContainerLaunchContext();
             String cmdString = launchContext.getCommands().toString();
             LOG.info("launchContext " + cmdString);
             myCommandLine = cmdString;

+ 9 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java

@@ -37,7 +37,6 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -224,10 +223,6 @@ public class TestContainerLauncher {
 
   @Test
   public void testSlowNM() throws Exception {
-    test();
-  }
-
-  private void test() throws Exception {
 
     conf = new Configuration();
     int maxAttempts = 1;
@@ -382,6 +377,15 @@ public class TestContainerLauncher {
     @Override
     public StartContainerResponse startContainer(StartContainerRequest request)
         throws YarnRemoteException {
+
+      // Validate that the container is what RM is giving.
+      Assert.assertEquals(MRApp.NM_HOST, request.getContainer().getNodeId()
+        .getHost());
+      Assert.assertEquals(MRApp.NM_PORT, request.getContainer().getNodeId()
+        .getPort());
+      Assert.assertEquals(MRApp.NM_HOST + ":" + MRApp.NM_HTTP_PORT, request
+        .getContainer().getNodeHttpAddress());
+
       StartContainerResponse response = recordFactory
           .newRecordInstance(StartContainerResponse.class);
       status = recordFactory.newRecordInstance(ContainerStatus.class);

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java

@@ -392,6 +392,7 @@ public class TypeConverter {
       FinalApplicationStatus finalApplicationStatus) {
     switch (yarnApplicationState) {
     case NEW:
+    case NEW_SAVING:
     case SUBMITTED:
     case ACCEPTED:
       return State.PREP;

+ 4 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java

@@ -23,6 +23,7 @@ import java.util.List;
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -48,6 +49,9 @@ public class TestTypeConverter {
     for (YarnApplicationState applicationState : YarnApplicationState.values()) {
       TypeConverter.fromYarn(applicationState, FinalApplicationStatus.FAILED);
     }
+    // ad hoc test of NEW_SAVING, which is newly added
+    Assert.assertEquals(State.PREP, TypeConverter.fromYarn(
+        YarnApplicationState.NEW_SAVING, FinalApplicationStatus.FAILED));
     
     for (TaskType taskType : TaskType.values()) {
       TypeConverter.fromYarn(taskType);

+ 17 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java

@@ -659,8 +659,24 @@ public class Job extends JobContextImpl implements JobContext {
             startFrom, numEvents); 
       }
     });
+  }
+
+  /**
+   * Get events indicating completion (success/failure) of component tasks.
+   *  
+   * @param startFrom index to start fetching events from
+   * @return an array of {@link TaskCompletionEvent}s
+   * @throws IOException
+   */
+  public TaskCompletionEvent[] getTaskCompletionEvents(final int startFrom) 
+      throws IOException {
+    try {
+      return getTaskCompletionEvents(startFrom, 10);
+    } catch (InterruptedException ie) {
+      throw new RuntimeException(ie);
     }
-  
+  }
+
   /**
    * Kill indicated task attempt.
    * 

+ 9 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java

@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapred.InvalidJobConfException;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -150,9 +150,14 @@ public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir
    * @param outputDir the {@link Path} of the output directory for 
    * the map-reduce job.
    */
-  public static void setOutputPath(Job job, Path outputDir) throws IOException {
-    outputDir = outputDir.getFileSystem(job.getConfiguration()).makeQualified(
-        outputDir);
+  public static void setOutputPath(Job job, Path outputDir) {
+    try {
+      outputDir = outputDir.getFileSystem(job.getConfiguration()).makeQualified(
+          outputDir);
+    } catch (IOException e) {
+        // Throw the IOException as a RuntimeException to be compatible with MR1
+        throw new RuntimeException(e);
+    }
     job.getConfiguration().set(FileOutputFormat.OUTDIR, outputDir.toString());
   }
 

+ 90 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -330,6 +330,14 @@
   <description>The max percent (0-1) of running tasks that
   can be speculatively re-executed at any time.</description>
 </property>
+
+<property>
+  <name>mapreduce.job.map.output.collector.class</name>
+  <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
+  <description>
+    It defines the MapOutputCollector implementation to use.
+  </description>
+</property>
  
 <property>
   <name>mapreduce.job.speculative.slowtaskthreshold</name>
@@ -1037,11 +1045,89 @@
 </property>
 
 <property>
-  <name>mapreduce.job.map.output.collector.class</name>
-  <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
-  <description>
-    It defines the MapOutputCollector implementation to use.
+  <name>mapreduce.jobhistory.intermediate-done-dir</name>
+  <value>${yarn.app.mapreduce.am.staging-dir}/history/done_intermediate</value>
+  <description></description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.done-dir</name>
+  <value>${yarn.app.mapreduce.am.staging-dir}/history/done</value>
+  <description></description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.cleaner.enable</name>
+  <value>true</value>
+  <description></description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.cleaner.interval-ms</name>
+  <value>86400000</value>
+  <description> How often the job history cleaner checks for files to delete, 
+  in milliseconds. Defaults to 86400000 (one day). Files are only deleted if
+  they are older than mapreduce.jobhistory.max-age-ms.
   </description>
 </property>
 
+<property>
+  <name>mapreduce.jobhistory.max-age-ms</name>
+  <value>604800000</value>
+  <description> Job history files older than this many milliseconds will
+  be deleted when the history cleaner runs. Defaults to 604800000 (1 week).
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.client.thread-count</name>
+  <value>10</value>
+  <description>The number of threads to handle client API requests</description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.datestring.cache.size</name>
+  <value>200000</value>
+  <description>Size of the date string cache. Effects the number of directories
+  which will be scanned to find a job.</description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.joblist.cache.size</name>
+  <value>20000</value>
+  <description>Size of the job list cache</description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.loadedjobs.cache.size</name>
+  <value>5</value>
+  <description>Size of the loaded job cache</description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.move.interval-ms</name>
+  <value>180000</value>
+  <description>Scan for history files to more from intermediate done dir to done
+  dir at this frequency.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.move.thread-count</name>
+  <value>3</value>
+  <description>The number of threads used to move files.</description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.store.class</name>
+  <value></value>
+  <description>The HistoryStorage class to use to cache history data.</description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.minicluster.fixed.ports</name>
+  <value>false</value>
+  <description>Whether to use fixed ports with the minicluster</description>
+</property>
+
 </configuration>

+ 79 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputFormat.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class TestFileOutputFormat extends TestCase {
+
+  public void testSetOutputPathException() throws Exception {
+    Job job = Job.getInstance();
+    try {
+      // Give it an invalid filesystem so it'll throw an exception
+      FileOutputFormat.setOutputPath(job, new Path("foo:///bar"));
+      fail("Should have thrown a RuntimeException with an IOException inside");
+    }
+    catch (RuntimeException re) {
+      assertTrue(re.getCause() instanceof IOException);
+    }
+  }
+
+  public void testCheckOutputSpecsException() throws Exception {
+    Job job = Job.getInstance();
+    Path outDir = new Path(System.getProperty("test.build.data", "/tmp"),
+            "output");
+    FileSystem fs = outDir.getFileSystem(new Configuration());
+    // Create the output dir so it already exists and set it for the job
+    fs.mkdirs(outDir);
+    FileOutputFormat.setOutputPath(job, outDir);
+    // We don't need a "full" implementation of FileOutputFormat for this test
+    FileOutputFormat fof = new FileOutputFormat() {
+      @Override
+        public RecordWriter getRecordWriter(TaskAttemptContext job)
+              throws IOException, InterruptedException {
+          return null;
+        }
+    };
+    try {
+      try {
+        // This should throw a FileAlreadyExistsException because the outputDir
+        // already exists
+        fof.checkOutputSpecs(job);
+        fail("Should have thrown a FileAlreadyExistsException");
+      }
+      catch (FileAlreadyExistsException re) {
+        // correct behavior
+      }
+    }
+    finally {
+      // Cleanup
+      if (fs.exists(outDir)) {
+        fs.delete(outDir, true);
+      }
+    }
+  }
+}

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java

@@ -234,6 +234,8 @@ public class ClientServiceDelegate {
       throw RPCUtil.getRemoteException("User is not set in the application report");
     }
     if (application.getYarnApplicationState() == YarnApplicationState.NEW
+        || application.getYarnApplicationState() ==
+            YarnApplicationState.NEW_SAVING
         || application.getYarnApplicationState() == YarnApplicationState.SUBMITTED
         || application.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
       realProxy = null;

+ 15 - 0
hadoop-yarn-project/CHANGES.txt

@@ -87,6 +87,9 @@ Release 2.0.5-beta - UNRELEASED
 
   NEW FEATURES
 
+    YARN-482. FS: Extend SchedulingMode to intermediate queues. 
+    (kkambatl via tucu)
+
   IMPROVEMENTS
 
     YARN-365. Change NM heartbeat handling to not generate a scheduler event
@@ -145,6 +148,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-495. Changed NM reboot behaviour to be a simple resync - kill all
     containers  and re-register with RM. (Jian He via vinodkv)
 
+    YARN-514. Delayed store operations should not result in RM unavailability
+    for app submission (Zhijie Shen via bikas)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -241,6 +247,15 @@ Release 2.0.5-beta - UNRELEASED
     YARN-500. Fixed YARN webapps to not roll-over ports when explicitly asked
     to use non-ephemeral ports. (Kenji Kikushima via vinodkv)
 
+    YARN-518. Fair Scheduler's document link could be added to the hadoop 2.x 
+    main doc page. (sandyr via tucu)
+
+    YARN-476. ProcfsBasedProcessTree info message confuses users. 
+    (sandyr via tucu)
+
+    YARN-585. Fix failure in TestFairScheduler#testNotAllowSubmitApplication
+    caused by YARN-514. (Zhijie Shen via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 8 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/YarnApplicationState.java

@@ -30,9 +30,15 @@ public enum YarnApplicationState {
   /** Application which was just created. */
   NEW,
 
+  /** Application which is being saved. */
+  NEW_SAVING,
+
   /** Application which has been submitted. */
   SUBMITTED,
-  
+
+  /** Application has been accepted by the scheduler */
+  ACCEPTED,
+
   /** Application which is currently running. */
   RUNNING,
 
@@ -43,8 +49,5 @@ public enum YarnApplicationState {
   FAILED,
 
   /** Application which was terminated by a user or admin. */
-  KILLED,
-
-  /** Application has been accepted by the scheduler */
-  ACCEPTED
+  KILLED
 }

+ 7 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -72,12 +72,13 @@ message ContainerProto {
 
 enum YarnApplicationStateProto {
   NEW = 1;
-  SUBMITTED = 2;
-  RUNNING = 3;
-  FINISHED = 4;
-  FAILED = 5;
-  KILLED = 6;
-  ACCEPTED = 7;
+  NEW_SAVING = 2;
+  SUBMITTED = 3;
+  ACCEPTED = 4;
+  RUNNING = 5;
+  FINISHED = 6;
+  FAILED = 7;
+  KILLED = 8;
 }
 
 enum FinalApplicationStatusProto {

+ 0 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java

@@ -382,8 +382,6 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
       in = new BufferedReader(fReader);
     } catch (FileNotFoundException f) {
       // The process vanished in the interim!
-      LOG.info("The process " + pinfo.getPid()
-          + " may have finished in the interim.");
       return ret;
     }
 

+ 0 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -297,20 +297,6 @@ public class ClientRMService extends AbstractService implements
       // So call handle directly and do not send an event.
       rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
           .currentTimeMillis()));
-      
-      // If recovery is enabled then store the application information in a 
-      // blocking call so make sure that RM has stored the information needed 
-      // to restart the AM after RM restart without further client communication
-      RMStateStore stateStore = rmContext.getStateStore();
-      LOG.info("Storing Application with id " + applicationId);
-      try {
-        stateStore.storeApplication(rmContext.getRMApps().get(applicationId));
-      } catch (Exception e) {
-        // For HA this exception needs to be handled by giving up 
-        // master status if we got fenced
-        LOG.error("Failed to store application:" + applicationId, e);
-        ExitUtil.terminate(1, e);
-      }
 
       LOG.info("Application with id " + applicationId.getId() + 
           " submitted by user " + user);

+ 6 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

@@ -232,7 +232,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
 
   @SuppressWarnings("unchecked")
   protected void submitApplication(
-      ApplicationSubmissionContext submissionContext, long submitTime) {
+      ApplicationSubmissionContext submissionContext, long submitTime,
+      boolean isRecovered) {
     ApplicationId applicationId = submissionContext.getApplicationId();
     RMApp application = null;
     try {
@@ -278,7 +279,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
       
       // All done, start the RMApp
       this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMAppEvent(applicationId, RMAppEventType.START));
+          new RMAppEvent(applicationId, isRecovered ? RMAppEventType.RECOVER:
+            RMAppEventType.START));
     } catch (IOException ie) {
         LOG.info("RMAppManager submit application exception", ie);
         if (application != null) {
@@ -347,7 +349,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
       if(shouldRecover) {
         LOG.info("Recovering application " + appState.getAppId());
         submitApplication(appState.getApplicationSubmissionContext(), 
-                        appState.getSubmitTime());
+                        appState.getSubmitTime(), true);
         // re-populate attempt information in application
         RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get(
                                                         appState.getAppId());
@@ -378,7 +380,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
         ApplicationSubmissionContext submissionContext = 
             ((RMAppManagerSubmitEvent)event).getSubmissionContext();
         long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
-        submitApplication(submissionContext, submitTime);
+        submitApplication(submissionContext, submitTime, false);
       }
       break;
       default:

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -63,6 +65,11 @@ public class MemoryRMStateStore extends RMStateStore {
     ApplicationState appState = new ApplicationState(
                          appStateData.getSubmitTime(), 
                          appStateData.getApplicationSubmissionContext());
+    if (state.appState.containsKey(appState.getAppId())) {
+      Exception e = new IOException("App: " + appId + " is already stored.");
+      LOG.info("Error storing info for app: " + appId, e);
+      throw e;
+    }
     state.appState.put(appState.getAppId(), appState);
   }
 
@@ -79,6 +86,13 @@ public class MemoryRMStateStore extends RMStateStore {
         attemptState.getAttemptId().getApplicationId());
     assert appState != null;
 
+    if (appState.attempts.containsKey(attemptState.getAttemptId())) {
+      Exception e = new IOException("Attempt: " +
+          attemptState.getAttemptId() + " is already stored.");
+      LOG.info("Error storing info for attempt: " +
+          attemptState.getAttemptId(), e);
+      throw e;
+    }
     appState.attempts.put(attemptState.getAttemptId(), attemptState);
   }
 

+ 49 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
 
@@ -166,21 +167,19 @@ public abstract class RMStateStore {
   public abstract RMState loadState() throws Exception;
   
   /**
-   * Blocking API
+   * Non-Blocking API
    * ResourceManager services use this to store the application's state
-   * This must not be called on the dispatcher thread
+   * This does not block the dispatcher threads
+   * RMAppStoredEvent will be sent on completion to notify the RMApp
    */
-  public synchronized void storeApplication(RMApp app) throws Exception {
+  @SuppressWarnings("unchecked")
+  public synchronized void storeApplication(RMApp app) {
     ApplicationSubmissionContext context = app
                                             .getApplicationSubmissionContext();
     assert context instanceof ApplicationSubmissionContextPBImpl;
-
-    ApplicationStateDataPBImpl appStateData = new ApplicationStateDataPBImpl();
-    appStateData.setSubmitTime(app.getSubmitTime());
-    appStateData.setApplicationSubmissionContext(context);
-
-    LOG.info("Storing info for app: " + context.getApplicationId());
-    storeApplicationState(app.getApplicationId().toString(), appStateData);
+    ApplicationState appState = new ApplicationState(
+        app.getSubmitTime(), context);
+    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
   }
     
   /**
@@ -255,6 +254,30 @@ public abstract class RMStateStore {
   
   private synchronized void handleStoreEvent(RMStateStoreEvent event) {
     switch(event.getType()) {
+      case STORE_APP:
+        {
+          ApplicationState apptState =
+              ((RMStateStoreAppEvent) event).getAppState();
+          Exception storedException = null;
+          ApplicationStateDataPBImpl appStateData =
+              new ApplicationStateDataPBImpl();
+          appStateData.setSubmitTime(apptState.getSubmitTime());
+          appStateData.setApplicationSubmissionContext(
+              apptState.getApplicationSubmissionContext());
+          ApplicationId appId =
+              apptState.getApplicationSubmissionContext().getApplicationId();
+
+          LOG.info("Storing info for app: " + appId);
+          try {
+            storeApplicationState(appId.toString(), appStateData);
+          } catch (Exception e) {
+            LOG.error("Error storing app: " + appId, e);
+            storedException = e;
+          } finally {
+            notifyDoneStoringApplication(appId, storedException);
+          }
+        }
+        break;
       case STORE_APP_ATTEMPT:
         {
           ApplicationAttemptState attemptState = 
@@ -297,11 +320,25 @@ public abstract class RMStateStore {
         LOG.error("Unknown RMStateStoreEvent type: " + event.getType());
     }
   }
+
+  @SuppressWarnings("unchecked")
+  /**
+   * In (@link handleStoreEvent}, this method is called to notify the
+   * application about operation completion
+   * @param appId id of the application that has been saved
+   * @param storedException the exception that is thrown when storing the
+   * application
+   */
+  private void notifyDoneStoringApplication(ApplicationId appId,
+                                                  Exception storedException) {
+    rmDispatcher.getEventHandler().handle(
+        new RMAppStoredEvent(appId, storedException));
+  }
   
   @SuppressWarnings("unchecked")
   /**
-   * In (@link storeApplicationAttempt}, derived class can call this method to
-   * notify the application attempt about operation completion 
+   * In (@link handleStoreEvent}, this method is called to notify the
+   * application attempt about operation completion
    * @param appAttempt attempt that has been saved
    */
   private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,

+ 35 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreAppEvent.java

@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+
+public class RMStateStoreAppEvent extends RMStateStoreEvent {
+
+  private final ApplicationState appState;
+
+  public RMStateStoreAppEvent(ApplicationState appState) {
+    super(RMStateStoreEventType.STORE_APP);
+    this.appState = appState;
+  }
+
+  public ApplicationState getAppState() {
+    return appState;
+  }
+}

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java

@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
 public enum RMStateStoreEventType {
   STORE_APP_ATTEMPT,
+  STORE_APP,
   REMOVE_APP
 }

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java

@@ -21,11 +21,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 public enum RMAppEventType {
   // Source: ClientRMService
   START,
+  RECOVER,
   KILL,
 
   // Source: RMAppAttempt
   APP_REJECTED,
   APP_ACCEPTED,
+  APP_SAVED,
   ATTEMPT_REGISTERED,
   ATTEMPT_FINISHING,
   ATTEMPT_FINISHED, // Will send the final state

+ 44 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -118,13 +119,25 @@ public class RMAppImpl implements RMApp, Recoverable {
      // Transitions from NEW state
     .addTransition(RMAppState.NEW, RMAppState.NEW,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
+        RMAppEventType.START, new RMAppSavingTransition())
     .addTransition(RMAppState.NEW, RMAppState.SUBMITTED,
-        RMAppEventType.START, new StartAppAttemptTransition())
+        RMAppEventType.RECOVER, new StartAppAttemptTransition())
     .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
         new AppKilledTransition())
     .addTransition(RMAppState.NEW, RMAppState.FAILED,
         RMAppEventType.APP_REJECTED, new AppRejectedTransition())
 
+    // Transitions from NEW_SAVING state
+    .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
+        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+    .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
+        RMAppEventType.APP_SAVED, new StartAppAttemptTransition())
+    .addTransition(RMAppState.NEW_SAVING, RMAppState.KILLED,
+        RMAppEventType.KILL, new AppKilledTransition())
+    .addTransition(RMAppState.NEW_SAVING, RMAppState.FAILED,
+        RMAppEventType.APP_REJECTED, new AppRejectedTransition())
+
      // Transitions from SUBMITTED state
     .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
@@ -182,7 +195,7 @@ public class RMAppImpl implements RMApp, Recoverable {
 
      // Transitions from FAILED state
     .addTransition(RMAppState.FAILED, RMAppState.FAILED,
-        RMAppEventType.KILL)
+        EnumSet.of(RMAppEventType.KILL, RMAppEventType.APP_SAVED))
      // ignorable transitions
     .addTransition(RMAppState.FAILED, RMAppState.FAILED, 
         RMAppEventType.NODE_UPDATE)
@@ -194,7 +207,7 @@ public class RMAppImpl implements RMApp, Recoverable {
         EnumSet.of(RMAppEventType.APP_ACCEPTED,
             RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
             RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
-            RMAppEventType.ATTEMPT_KILLED))
+            RMAppEventType.ATTEMPT_KILLED, RMAppEventType.APP_SAVED))
      // ignorable transitions
     .addTransition(RMAppState.KILLED, RMAppState.KILLED,
         RMAppEventType.NODE_UPDATE)
@@ -358,6 +371,8 @@ public class RMAppImpl implements RMApp, Recoverable {
     switch(rmAppState) {
     case NEW:
       return YarnApplicationState.NEW;
+    case NEW_SAVING:
+      return YarnApplicationState.NEW_SAVING;
     case SUBMITTED:
       return YarnApplicationState.SUBMITTED;
     case ACCEPTED:
@@ -378,6 +393,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) {
     switch(state) {
     case NEW:
+    case NEW_SAVING:
     case SUBMITTED:
     case ACCEPTED:
     case RUNNING:
@@ -591,6 +607,19 @@ public class RMAppImpl implements RMApp, Recoverable {
   
   private static final class StartAppAttemptTransition extends RMAppTransition {
     public void transition(RMAppImpl app, RMAppEvent event) {
+      if (event.getType().equals(RMAppEventType.APP_SAVED)) {
+        assert app.getState().equals(RMAppState.NEW_SAVING);
+        RMAppStoredEvent storeEvent = (RMAppStoredEvent) event;
+        if(storeEvent.getStoredException() != null) {
+          // For HA this exception needs to be handled by giving up
+          // master status if we got fenced
+          LOG.error("Failed to store application: "
+              + storeEvent.getApplicationId(),
+              storeEvent.getStoredException());
+          ExitUtil.terminate(1, storeEvent.getStoredException());
+        }
+      }
+
       app.createNewAttempt(true);
     };
   }
@@ -603,6 +632,18 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
   }
 
+  private static final class RMAppSavingTransition extends RMAppTransition {
+    @Override
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      // If recovery is enabled then store the application information in a
+      // non-blocking call so make sure that RM has stored the information
+      // needed to restart the AM after RM restart without further client
+      // communication
+      LOG.info("Storing application with id " + app.applicationId);
+      app.rmContext.getStateStore().storeApplication(app);
+    }
+  }
+
   private static class AppFinishedTransition extends FinalTransition {
     public void transition(RMAppImpl app, RMAppEvent event) {
       RMAppFinishedAttemptEvent finishedEvent =

+ 9 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java

@@ -19,5 +19,13 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 public enum RMAppState {
-  NEW, SUBMITTED, ACCEPTED, RUNNING, FINISHING, FINISHED, FAILED, KILLED
+  NEW,
+  NEW_SAVING,
+  SUBMITTED,
+  ACCEPTED,
+  RUNNING,
+  FINISHING,
+  FINISHED,
+  FAILED,
+  KILLED
 }

+ 36 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppStoredEvent.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class RMAppStoredEvent extends RMAppEvent {
+
+  private final Exception storedException;
+
+  public RMAppStoredEvent(ApplicationId appId, Exception storedException) {
+    super(appId, RMAppEventType.APP_SAVED);
+    this.storedException = storedException;
+  }
+
+  public Exception getStoredException() {
+    return storedException;
+  }
+
+}

+ 10 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java

@@ -278,9 +278,7 @@ public class AppSchedulable extends Schedulable {
     }
   }
 
-
-  @Override
-  public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
+  private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
     LOG.info("Node offered to app: " + getName() + " reserved: " + reserved);
 
     if (reserved) {
@@ -345,4 +343,13 @@ public class AppSchedulable extends Schedulable {
     }
     return Resources.none();
   }
+
+  public Resource assignReservedContainer(FSSchedulerNode node) {
+    return assignContainer(node, true);
+  }
+
+  @Override
+  public Resource assignContainer(FSSchedulerNode node) {
+    return assignContainer(node, false);
+  }
 }

+ 23 - 36
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java

@@ -40,9 +40,6 @@ public class FSLeafQueue extends FSQueue {
     
   private final List<AppSchedulable> appScheds = 
       new ArrayList<AppSchedulable>();
-
-  /** Scheduling mode for jobs inside the queue (fair or FIFO) */
-  private SchedulingMode schedulingMode;
   
   private final FairScheduler scheduler;
   private final QueueManager queueMgr;
@@ -86,13 +83,18 @@ public class FSLeafQueue extends FSQueue {
     return appScheds;
   }
 
-  public void setSchedulingMode(SchedulingMode mode) {
-    this.schedulingMode = mode;
+  @Override
+  public void setPolicy(SchedulingPolicy policy)
+      throws AllocationConfigurationException {
+    if (!SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)) {
+      throwPolicyDoesnotApplyException(policy);
+    }
+    super.policy = policy;
   }
   
   @Override
-  public void recomputeFairShares() {
-    schedulingMode.computeShares(getAppSchedulables(), getFairShare());
+  public void recomputeShares() {
+    policy.computeShares(getAppSchedulables(), getFairShare());
   }
 
   @Override
@@ -136,42 +138,27 @@ public class FSLeafQueue extends FSQueue {
   }
 
   @Override
-  public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
-    LOG.debug("Node offered to queue: " + getName() + " reserved: " + reserved);
-    // If this queue is over its limit, reject
-    if (Resources.greaterThan(getResourceUsage(),
-        queueMgr.getMaxResources(getName()))) {
-      return Resources.none();
+  public Resource assignContainer(FSSchedulerNode node) {
+    Resource assigned = Resources.none();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Node offered to queue: " + getName());
     }
 
-    // If this node already has reserved resources for an app, first try to
-    // finish allocating resources for that app.
-    if (reserved) {
-      for (AppSchedulable sched : appScheds) {
-        if (sched.getApp().getApplicationAttemptId() ==
-            node.getReservedContainer().getApplicationAttemptId()) {
-          return sched.assignContainer(node, reserved);
-        }
-      }
-      return Resources.none(); // We should never get here
+    if (!assignContainerPreCheck(node)) {
+      return assigned;
     }
 
-    // Otherwise, chose app to schedule based on given policy.
-    else {
-      Comparator<Schedulable> comparator = schedulingMode.getComparator();
-
-      Collections.sort(appScheds, comparator);
-      for (AppSchedulable sched: appScheds) {
-        if (sched.getRunnable()) {
-          Resource assignedResource = sched.assignContainer(node, reserved);
-          if (!assignedResource.equals(Resources.none())) {
-            return assignedResource;
-          }
+    Comparator<Schedulable> comparator = policy.getComparator();
+    Collections.sort(appScheds, comparator);
+    for (AppSchedulable sched : appScheds) {
+      if (sched.getRunnable()) {
+        assigned = sched.assignContainer(node);
+        if (Resources.greaterThan(assigned, Resources.none())) {
+          break;
         }
       }
-
-      return Resources.none();
     }
+    return assigned;
   }
 
   @Override

+ 35 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -33,7 +34,6 @@ public class FSParentQueue extends FSQueue {
   private static final Log LOG = LogFactory.getLog(
       FSParentQueue.class.getName());
 
-
   private final List<FSQueue> childQueues = 
       new ArrayList<FSQueue>();
   private final QueueManager queueMgr;
@@ -50,11 +50,11 @@ public class FSParentQueue extends FSQueue {
   }
 
   @Override
-  public void recomputeFairShares() {
-    SchedulingMode.getDefault().computeShares(childQueues, getFairShare());
+  public void recomputeShares() {
+    policy.computeShares(childQueues, getFairShare());
     for (FSQueue childQueue : childQueues) {
       childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare());
-      childQueue.recomputeFairShares();
+      childQueue.recomputeShares();
     }
   }
 
@@ -131,13 +131,41 @@ public class FSParentQueue extends FSQueue {
   }
 
   @Override
-  public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
-    throw new IllegalStateException(
-        "Parent queue should not be assigned container");
+  public Resource assignContainer(FSSchedulerNode node) {
+    Resource assigned = Resources.none();
+
+    // If this queue is over its limit, reject
+    if (Resources.greaterThan(getResourceUsage(),
+        queueMgr.getMaxResources(getName()))) {
+      return assigned;
+    }
+
+    Collections.sort(childQueues, policy.getComparator());
+    for (FSQueue child : childQueues) {
+      assigned = child.assignContainer(node);
+      if (node.getReservedContainer() != null
+          || Resources.greaterThan(assigned, Resources.none())) {
+        break;
+      }
+    }
+    return assigned;
   }
 
   @Override
   public Collection<FSQueue> getChildQueues() {
     return childQueues;
   }
+
+  @Override
+  public void setPolicy(SchedulingPolicy policy)
+      throws AllocationConfigurationException {
+    boolean allowed =
+        SchedulingPolicy.isApplicableTo(policy, (this == queueMgr
+            .getRootQueue()) ? SchedulingPolicy.DEPTH_ROOT
+            : SchedulingPolicy.DEPTH_INTERMEDIATE);
+    if (!allowed) {
+      throwPolicyDoesnotApplyException(policy);
+    }
+    super.policy = policy;
+  }
 }

+ 32 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java

@@ -45,6 +45,8 @@ public abstract class FSQueue extends Schedulable implements Queue {
   protected final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
   
+  protected SchedulingPolicy policy = SchedulingPolicy.getDefault();
+
   public FSQueue(String name, QueueManager queueMgr, 
       FairScheduler scheduler, FSParentQueue parent) {
     this.name = name;
@@ -63,6 +65,19 @@ public abstract class FSQueue extends Schedulable implements Queue {
     return name;
   }
   
+  public SchedulingPolicy getPolicy() {
+    return policy;
+  }
+
+  protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy)
+      throws AllocationConfigurationException {
+    throw new AllocationConfigurationException("SchedulingPolicy " + policy
+        + " does not apply to queue " + getName());
+  }
+
+  public abstract void setPolicy(SchedulingPolicy policy)
+      throws AllocationConfigurationException;
+
   @Override
   public double getWeight() {
     return queueMgr.getQueueWeight(getName());
@@ -130,13 +145,27 @@ public abstract class FSQueue extends Schedulable implements Queue {
   }
   
   /**
-   * Recomputes the fair shares for all queues and applications
-   * under this queue.
+   * Recomputes the shares for all child queues and applications based on this
+   * queue's current share
    */
-  public abstract void recomputeFairShares();
+  public abstract void recomputeShares();
   
   /**
    * Gets the children of this queue, if any.
    */
   public abstract Collection<FSQueue> getChildQueues();
+
+  /**
+   * Helper method to check if the queue should attempt assigning resources
+   * 
+   * @return true if check passes (can assign) or false otherwise
+   */
+  protected boolean assignContainerPreCheck(FSSchedulerNode node) {
+    if (Resources.greaterThan(getResourceUsage(),
+        queueMgr.getMaxResources(getName()))
+        || node.getReservedContainer() != null) {
+      return false;
+    }
+    return true;
+  }
 }

+ 7 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java

@@ -52,6 +52,7 @@ public class FSSchedulerNode extends SchedulerNode {
   private volatile int numContainers;
 
   private RMContainer reservedContainer;
+  private AppSchedulable reservedAppSchedulable;
   
   /* set of containers that are allocated containers */
   private final Map<ContainerId, RMContainer> launchedContainers = 
@@ -221,6 +222,7 @@ public class FSSchedulerNode extends SchedulerNode {
           " on node " + this + " for application " + application);
     }
     this.reservedContainer = reservedContainer;
+    this.reservedAppSchedulable = application.getAppSchedulable();
   }
 
   public synchronized void unreserveResource(
@@ -237,11 +239,15 @@ public class FSSchedulerNode extends SchedulerNode {
           " on node " + this);
     }
     
-    reservedContainer = null;
+    this.reservedContainer = null;
+    this.reservedAppSchedulable = null;
   }
 
   public synchronized RMContainer getReservedContainer() {
     return reservedContainer;
   }
 
+  public synchronized AppSchedulable getReservedAppSchedulable() {
+    return reservedAppSchedulable;
+  }
 }

+ 14 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -161,7 +161,7 @@ public class FairScheduler implements ResourceScheduler {
   protected boolean assignMultiple; // Allocate multiple containers per
                                     // heartbeat
   protected int maxAssign; // Max containers to assign per heartbeat
-  
+
   public FairScheduler() {
     clock = new SystemClock();
     queueMgr = new QueueManager(this);
@@ -217,7 +217,7 @@ public class FairScheduler implements ResourceScheduler {
     rootQueue.setFairShare(clusterCapacity);
     // Recursively compute fair shares for all queues
     // and update metrics
-    rootQueue.recomputeFairShares();
+    rootQueue.recomputeShares();
 
     // Update recorded capacity of root queue (child queues are updated
     // when fair share is calculated).
@@ -786,39 +786,24 @@ public class FairScheduler implements ResourceScheduler {
     // 1. Check for reserved applications
     // 2. Schedule if there are no reservations
 
-    // If we have have an application that has reserved a resource on this node
-    // already, we try to complete the reservation.
-    RMContainer reservedContainer = node.getReservedContainer();
-    if (reservedContainer != null) {
-      FSSchedulerApp reservedApplication =
-          applications.get(reservedContainer.getApplicationAttemptId());
+    AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
+    if (reservedAppSchedulable != null) {
+      // Reservation exists; try to fulfill the reservation
+      LOG.info("Trying to fulfill reservation for application "
+          + reservedAppSchedulable.getApp().getApplicationAttemptId()
+          + " on node: " + nm);
 
-      // Try to fulfill the reservation
-      LOG.info("Trying to fulfill reservation for application " +
-          reservedApplication.getApplicationId() + " on node: " + nm);
-
-      FSLeafQueue queue = queueMgr.getLeafQueue(reservedApplication.getQueueName());
-      queue.assignContainer(node, true);
+      node.getReservedAppSchedulable().assignReservedContainer(node);
     }
-
-    // Otherwise, schedule at queue which is furthest below fair share
     else {
+      // No reservation, schedule at queue which is farthest below fair share
       int assignedContainers = 0;
       while (node.getReservedContainer() == null) {
-        // At most one task is scheduled each iteration of this loop
-        List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
-            queueMgr.getLeafQueues());
-        Collections.sort(scheds, SchedulingMode.getDefault().getComparator());
         boolean assignedContainer = false;
-        for (FSLeafQueue sched : scheds) {
-          Resource assigned = sched.assignContainer(node, false);
-          if (Resources.greaterThan(assigned, Resources.none()) ||
-              node.getReservedContainer() != null) {
-            eventLog.log("ASSIGN", nm.getHostName(), assigned);
-            assignedContainers++;
-            assignedContainer = true;
-            break;
-          }
+        if (Resources.greaterThan(
+            queueMgr.getRootQueue().assignContainer(node),
+            Resources.none())) {
+          assignedContainer = true;
         }
         if (!assignedContainer) { break; }
         if (!assignMultiple) { break; }

+ 29 - 25
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java

@@ -143,7 +143,6 @@ public class QueueManager {
         if (leafQueue == null) {
           return null;
         }
-        leafQueue.setSchedulingMode(info.defaultSchedulingMode);
         queue = leafQueue;
       } else if (queue instanceof FSParentQueue) {
         return null;
@@ -302,7 +301,7 @@ public class QueueManager {
     Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
     Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
     Map<String, Double> queueWeights = new HashMap<String, Double>();
-    Map<String, SchedulingMode> queueModes = new HashMap<String, SchedulingMode>();
+    Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
     Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
     Map<String, Map<QueueACL, AccessControlList>> queueAcls =
         new HashMap<String, Map<QueueACL, AccessControlList>>();
@@ -310,7 +309,7 @@ public class QueueManager {
     int queueMaxAppsDefault = Integer.MAX_VALUE;
     long fairSharePreemptionTimeout = Long.MAX_VALUE;
     long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
-    SchedulingMode defaultSchedulingMode = SchedulingMode.getDefault();
+    SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault();
 
     // Remember all queue names so we can display them on web UI, etc.
     List<String> queueNamesInAllocFile = new ArrayList<String>();
@@ -339,7 +338,7 @@ public class QueueManager {
       if ("queue".equals(element.getTagName()) ||
     	  "pool".equals(element.getTagName())) {
         loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps,
-            userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
+            userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
             queueAcls, queueNamesInAllocFile);
       } else if ("user".equals(element.getTagName())) {
         String userName = element.getAttribute("name");
@@ -370,11 +369,12 @@ public class QueueManager {
       } else if ("queueMaxAppsDefault".equals(element.getTagName())) {
         String text = ((Text)element.getFirstChild()).getData().trim();
         int val = Integer.parseInt(text);
-        queueMaxAppsDefault = val;}
-      else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
+        queueMaxAppsDefault = val;
+      } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
+          || "defaultQueueSchedulingMode".equals(element.getTagName())) {
         String text = ((Text)element.getFirstChild()).getData().trim();
-        SchedulingMode.setDefault(text);
-        defaultSchedulingMode = SchedulingMode.getDefault();
+        SchedulingPolicy.setDefault(text);
+        defaultSchedPolicy = SchedulingPolicy.getDefault();
       } else {
         LOG.warn("Bad element in allocations file: " + element.getTagName());
       }
@@ -385,7 +385,7 @@ public class QueueManager {
     synchronized (this) {
       info = new QueueManagerInfo(minQueueResources, maxQueueResources,
           queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
-          queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts,
+          queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
           queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
       
       // Root queue should have empty ACLs.  As a queue's ACL is the union of
@@ -396,14 +396,15 @@ public class QueueManager {
       rootAcls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(" "));
       rootAcls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(" "));
       queueAcls.put(ROOT_QUEUE, rootAcls);
-
+ 
+      // Create all queus
       for (String name: queueNamesInAllocFile) {
-        FSLeafQueue queue = getLeafQueue(name);
-        if (queueModes.containsKey(name)) {
-          queue.setSchedulingMode(queueModes.get(name));
-        } else {
-          queue.setSchedulingMode(defaultSchedulingMode);
-        }
+        getLeafQueue(name);
+      }
+      
+      // Set custom policies as specified
+      for (Map.Entry<String, SchedulingPolicy> entry : queuePolicies.entrySet()) {
+        queues.get(entry.getKey()).setPolicy(entry.getValue());
       }
     }
   }
@@ -414,7 +415,8 @@ public class QueueManager {
   private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
       Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
       Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
-      Map<String, SchedulingMode> queueModes, Map<String, Long> minSharePreemptionTimeouts,
+      Map<String, SchedulingPolicy> queuePolicies,
+      Map<String, Long> minSharePreemptionTimeouts,
       Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile) 
       throws AllocationConfigurationException {
     String queueName = parentName + "." + element.getAttribute("name");
@@ -448,9 +450,10 @@ public class QueueManager {
         String text = ((Text)field.getFirstChild()).getData().trim();
         long val = Long.parseLong(text) * 1000L;
         minSharePreemptionTimeouts.put(queueName, val);
-      } else if ("schedulingMode".equals(field.getTagName())) {
+      } else if ("schedulingPolicy".equals(field.getTagName())
+          || "schedulingMode".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
-        queueModes.put(queueName, SchedulingMode.parse(text));
+        queuePolicies.put(queueName, SchedulingPolicy.parse(text));
       } else if ("aclSubmitApps".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
         acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
@@ -459,8 +462,9 @@ public class QueueManager {
         acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
       } else if ("queue".endsWith(field.getTagName()) || 
           "pool".equals(field.getTagName())) {
-        loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps,
-            userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
+        loadQueue(queueName, field, minQueueResources, maxQueueResources,
+            queueMaxApps, userMaxApps, queueWeights, queuePolicies,
+            minSharePreemptionTimeouts,
             queueAcls, queueNamesInAllocFile);
         isLeaf = false;
       }
@@ -615,13 +619,13 @@ public class QueueManager {
     // below half its fair share for this long, it is allowed to preempt tasks.
     public final long fairSharePreemptionTimeout;
 
-    public final SchedulingMode defaultSchedulingMode;
+    public final SchedulingPolicy defaultSchedulingPolicy;
     
     public QueueManagerInfo(Map<String, Resource> minQueueResources, 
         Map<String, Resource> maxQueueResources, 
         Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
         Map<String, Double> queueWeights, int userMaxAppsDefault,
-        int queueMaxAppsDefault, SchedulingMode defaultSchedulingMode, 
+        int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy, 
         Map<String, Long> minSharePreemptionTimeouts, 
         Map<String, Map<QueueACL, AccessControlList>> queueAcls,
         long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) {
@@ -632,7 +636,7 @@ public class QueueManager {
       this.queueWeights = queueWeights;
       this.userMaxAppsDefault = userMaxAppsDefault;
       this.queueMaxAppsDefault = queueMaxAppsDefault;
-      this.defaultSchedulingMode = defaultSchedulingMode;
+      this.defaultSchedulingPolicy = defaultSchedulingPolicy;
       this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
       this.queueAcls = queueAcls;
       this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
@@ -651,7 +655,7 @@ public class QueueManager {
       minSharePreemptionTimeouts = new HashMap<String, Long>();
       defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
       fairSharePreemptionTimeout = Long.MAX_VALUE;
-      defaultSchedulingMode = SchedulingMode.getDefault();
+      defaultSchedulingPolicy = SchedulingPolicy.getDefault();
     }
   }
 }

+ 2 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java

@@ -93,11 +93,9 @@ public abstract class Schedulable {
 
   /**
    * Assign a container on this node if possible, and return the amount of
-   * resources assigned. If {@code reserved} is true, it means a reservation
-   * already exists on this node, and the schedulable should fulfill that
-   * reservation if possible.
+   * resources assigned.
    */
-  public abstract Resource assignContainer(FSSchedulerNode node, boolean reserved);
+  public abstract Resource assignContainer(FSSchedulerNode node);
 
   /** Assign a fair share to this Schedulable. */
   public void setFairShare(Resource fairShare) {

+ 0 - 118
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java

@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
-
-@Public
-@Unstable
-public abstract class SchedulingMode {
-  private static final ConcurrentHashMap<Class<? extends SchedulingMode>, SchedulingMode> instances =
-      new ConcurrentHashMap<Class<? extends SchedulingMode>, SchedulingMode>();
-
-  private static SchedulingMode DEFAULT_MODE =
-      getInstance(FairSchedulingMode.class);
-  
-  public static SchedulingMode getDefault() {
-    return DEFAULT_MODE;
-  }
-
-  public static void setDefault(String className)
-      throws AllocationConfigurationException {
-    DEFAULT_MODE = parse(className);
-  }
-
-  /**
-   * Returns a {@link SchedulingMode} instance corresponding to the passed clazz
-   */
-  public static SchedulingMode getInstance(Class<? extends SchedulingMode> clazz) {
-    SchedulingMode mode = instances.get(clazz);
-    if (mode == null) {
-      mode = ReflectionUtils.newInstance(clazz, null);
-      instances.put(clazz, mode);
-    }
-    return mode;
-  }
-
-  /**
-   * Returns {@link SchedulingMode} instance corresponding to the
-   * {@link SchedulingMode} passed as a string. The mode can be "fair" for
-   * FairSchedulingMode of "fifo" for FifoSchedulingMode. For custom
-   * {@link SchedulingMode}s in the RM classpath, the mode should be canonical
-   * class name of the {@link SchedulingMode}.
-   * 
-   * @param mode canonical class name or "fair" or "fifo"
-   * @throws AllocationConfigurationException
-   */
-  @SuppressWarnings("unchecked")
-  public static SchedulingMode parse(String mode)
-      throws AllocationConfigurationException {
-    @SuppressWarnings("rawtypes")
-    Class clazz;
-    String text = mode.toLowerCase();
-    if (text.equals("fair")) {
-      clazz = FairSchedulingMode.class;
-    } else if (text.equals("fifo")) {
-      clazz = FifoSchedulingMode.class;
-    } else {
-      try {
-        clazz = Class.forName(mode);
-      } catch (ClassNotFoundException cnfe) {
-        throw new AllocationConfigurationException(mode
-            + " SchedulingMode class not found!");
-      }
-    }
-    if (!SchedulingMode.class.isAssignableFrom(clazz)) {
-      throw new AllocationConfigurationException(mode
-          + " does not extend SchedulingMode");
-    }
-    return getInstance(clazz);
-  }
-
-  /**
-   * @return returns the name of SchedulingMode
-   */
-  public abstract String getName();
-
-  /**
-   * The comparator returned by this method is to be used for sorting the
-   * {@link Schedulable}s in that queue.
-   * 
-   * @return the comparator to sort by
-   */
-  public abstract Comparator<Schedulable> getComparator();
-
-  /**
-   * Computes and updates the shares of {@link Schedulable}s as per the
-   * SchedulingMode, to be used later at schedule time.
-   * 
-   * @param schedulables {@link Schedulable}s whose shares are to be updated
-   * @param totalResources Total {@link Resource}s in the cluster
-   */
-  public abstract void computeShares(
-      Collection<? extends Schedulable> schedulables, Resource totalResources);
-}

+ 145 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java

@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
+
+@Public
+@Unstable
+public abstract class SchedulingPolicy {
+  private static final ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy> instances =
+      new ConcurrentHashMap<Class<? extends SchedulingPolicy>, SchedulingPolicy>();
+
+  private static SchedulingPolicy DEFAULT_POLICY =
+      getInstance(FairSharePolicy.class);
+  
+  public static final byte DEPTH_LEAF = (byte) 1;
+  public static final byte DEPTH_INTERMEDIATE = (byte) 2;
+  public static final byte DEPTH_ROOT = (byte) 4;
+  public static final byte DEPTH_PARENT = (byte) 6; // Root and Intermediate
+  public static final byte DEPTH_ANY = (byte) 7;
+
+  public static SchedulingPolicy getDefault() {
+    return DEFAULT_POLICY;
+  }
+
+  public static void setDefault(String className)
+      throws AllocationConfigurationException {
+    DEFAULT_POLICY = parse(className);
+  }
+
+  /**
+   * Returns a {@link SchedulingPolicy} instance corresponding to the passed clazz
+   */
+  public static SchedulingPolicy getInstance(Class<? extends SchedulingPolicy> clazz) {
+    SchedulingPolicy policy = instances.get(clazz);
+    if (policy == null) {
+      policy = ReflectionUtils.newInstance(clazz, null);
+      instances.put(clazz, policy);
+    }
+    return policy;
+  }
+
+  /**
+   * Returns {@link SchedulingPolicy} instance corresponding to the
+   * {@link SchedulingPolicy} passed as a string. The policy can be "fair" for
+   * FairsharePolicy or "fifo" for FifoPolicy. For custom
+   * {@link SchedulingPolicy}s in the RM classpath, the policy should be
+   * canonical class name of the {@link SchedulingPolicy}.
+   * 
+   * @param policy canonical class name or "fair" or "fifo"
+   * @throws AllocationConfigurationException
+   */
+  @SuppressWarnings("unchecked")
+  public static SchedulingPolicy parse(String policy)
+      throws AllocationConfigurationException {
+    @SuppressWarnings("rawtypes")
+    Class clazz;
+    String text = policy.toLowerCase();
+    if (text.equals("fair")) {
+      clazz = FairSharePolicy.class;
+    } else if (text.equals("fifo")) {
+      clazz = FifoPolicy.class;
+    } else {
+      try {
+        clazz = Class.forName(policy);
+      } catch (ClassNotFoundException cnfe) {
+        throw new AllocationConfigurationException(policy
+            + " SchedulingPolicy class not found!");
+      }
+    }
+    if (!SchedulingPolicy.class.isAssignableFrom(clazz)) {
+      throw new AllocationConfigurationException(policy
+          + " does not extend SchedulingPolicy");
+    }
+    return getInstance(clazz);
+  }
+
+  /**
+   * @return returns the name of {@link SchedulingPolicy}
+   */
+  public abstract String getName();
+
+  /**
+   * Specifies the depths in the hierarchy, this {@link SchedulingPolicy}
+   * applies to
+   * 
+   * @return depth equal to one of fields {@link SchedulingPolicy}#DEPTH_*
+   */
+  public abstract byte getApplicableDepth();
+
+  /**
+   * Checks if the specified {@link SchedulingPolicy} can be used for a queue at
+   * the specified depth in the hierarchy
+   * 
+   * @param policy {@link SchedulingPolicy} we are checking the
+   *          depth-applicability for
+   * @param depth queue's depth in the hierarchy
+   * @return true if policy is applicable to passed depth, false otherwise
+   */
+  public static boolean isApplicableTo(SchedulingPolicy policy, byte depth) {
+    return ((policy.getApplicableDepth() & depth) == depth) ? true : false;
+  }
+
+  /**
+   * The comparator returned by this method is to be used for sorting the
+   * {@link Schedulable}s in that queue.
+   * 
+   * @return the comparator to sort by
+   */
+  public abstract Comparator<Schedulable> getComparator();
+
+  /**
+   * Computes and updates the shares of {@link Schedulable}s as per the
+   * {@link SchedulingPolicy}, to be used later at schedule time.
+   * 
+   * @param schedulables {@link Schedulable}s whose shares are to be updated
+   * @param totalResources Total {@link Resource}s in the cluster
+   */
+  public abstract void computeShares(
+      Collection<? extends Schedulable> schedulables, Resource totalResources);
+}

+ 9 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes;
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
 
 import java.io.Serializable;
 import java.util.Collection;
@@ -24,13 +24,13 @@ import java.util.Comparator;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
 
 import com.google.common.annotations.VisibleForTesting;
 
-public class FairSchedulingMode extends SchedulingMode {
+public class FairSharePolicy extends SchedulingPolicy {
   @VisibleForTesting
-  public static final String NAME = "FairShare";
+  public static final String NAME = "Fairshare";
   private FairShareComparator comparator = new FairShareComparator();
 
   @Override
@@ -211,4 +211,9 @@ public class FairSchedulingMode extends SchedulingMode {
     share = Math.min(share, sched.getDemand().getMemory());
     return Resources.createResource((int) share);
   }
+
+  @Override
+  public byte getApplicableDepth() {
+    return SchedulingPolicy.DEPTH_ANY;
+  }
 }

+ 8 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes;
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
 
 import java.io.Serializable;
 import java.util.Collection;
@@ -24,11 +24,11 @@ import java.util.Comparator;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
 
 import com.google.common.annotations.VisibleForTesting;
 
-public class FifoSchedulingMode extends SchedulingMode {
+public class FifoPolicy extends SchedulingPolicy {
   @VisibleForTesting
   public static final String NAME = "FIFO";
   private FifoComparator comparator = new FifoComparator();
@@ -73,4 +73,9 @@ public class FifoSchedulingMode extends SchedulingMode {
       sched.setFairShare(Resources.createResource(0));
     }
   }
+
+  @Override
+  public byte getApplicableDepth() {
+    return SchedulingPolicy.DEPTH_LEAF;
+  }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java

@@ -63,6 +63,7 @@ public class RmController extends Controller {
     // limit applications to those in states relevant to scheduling
     set(YarnWebParams.APP_STATE, StringHelper.cjoin(
         RMAppState.NEW.toString(),
+        RMAppState.NEW_SAVING.toString(),
         RMAppState.SUBMITTED.toString(),
         RMAppState.ACCEPTED.toString(),
         RMAppState.RUNNING.toString(),

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java

@@ -83,7 +83,9 @@ public class AppInfo {
       String trackingUrl = app.getTrackingUrl();
       this.state = app.getState();
       this.trackingUrlIsNotReady = trackingUrl == null || trackingUrl.isEmpty()
-          || RMAppState.NEW == this.state || RMAppState.SUBMITTED == this.state
+          || RMAppState.NEW == this.state
+          || RMAppState.NEW_SAVING == this.state
+          || RMAppState.SUBMITTED == this.state
           || RMAppState.ACCEPTED == this.state;
       this.trackingUI = this.trackingUrlIsNotReady ? "UNASSIGNED" : (app
           .getFinishTime() == 0 ? "ApplicationMaster" : "History");

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java

@@ -164,7 +164,8 @@ public class TestAppManager{
     }
     public void submitApplication(
         ApplicationSubmissionContext submissionContext) {
-      super.submitApplication(submissionContext, System.currentTimeMillis());
+      super.submitApplication(
+          submissionContext, System.currentTimeMillis(), false);
     }
   }
 

+ 108 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -138,8 +139,9 @@ public class TestRMAppTransitions {
         mock(ContainerAllocationExpirer.class);
     AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
     AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
+    RMStateStore store = mock(RMStateStore.class);
     this.rmContext =
-        new RMContextImpl(rmDispatcher,
+        new RMContextImpl(rmDispatcher, store,
           containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
           null, new ApplicationTokenSecretManager(conf),
           new RMContainerTokenSecretManager(conf),
@@ -176,6 +178,9 @@ public class TestRMAppTransitions {
     if(submissionContext == null) {
       submissionContext = new ApplicationSubmissionContextPBImpl();
     }
+    // applicationId will not be used because RMStateStore is mocked,
+    // but applicationId is still set for safety
+    submissionContext.setApplicationId(applicationId);
 
     RMApp application =
         new RMAppImpl(applicationId, rmContext, conf, name, user, queue,
@@ -264,21 +269,45 @@ public class TestRMAppTransitions {
         diag.toString().matches(regex));
   }
 
-  protected RMApp testCreateAppSubmitted(
+  protected RMApp testCreateAppNewSaving(
       ApplicationSubmissionContext submissionContext) throws IOException {
   RMApp application = createNewTestApp(submissionContext);
-    // NEW => SUBMITTED event RMAppEventType.START
+    // NEW => NEW_SAVING event RMAppEventType.START
     RMAppEvent event = 
         new RMAppEvent(application.getApplicationId(), RMAppEventType.START);
     application.handle(event);
     assertStartTimeSet(application);
+    assertAppState(RMAppState.NEW_SAVING, application);
+    return application;
+  }
+
+  protected RMApp testCreateAppSubmittedNoRecovery(
+      ApplicationSubmissionContext submissionContext) throws IOException {
+  RMApp application = testCreateAppNewSaving(submissionContext);
+    // NEW_SAVING => SUBMITTED event RMAppEventType.APP_SAVED
+    RMAppEvent event =
+        new RMAppStoredEvent(application.getApplicationId(), null);
+    application.handle(event);
+    assertStartTimeSet(application);
+    assertAppState(RMAppState.SUBMITTED, application);
+    return application;
+  }
+
+  protected RMApp testCreateAppSubmittedRecovery(
+      ApplicationSubmissionContext submissionContext) throws IOException {
+    RMApp application = createNewTestApp(submissionContext);
+    // NEW => SUBMITTED event RMAppEventType.RECOVER
+    RMAppEvent event =
+        new RMAppEvent(application.getApplicationId(), RMAppEventType.RECOVER);
+    application.handle(event);
+    assertStartTimeSet(application);
     assertAppState(RMAppState.SUBMITTED, application);
     return application;
   }
 
   protected RMApp testCreateAppAccepted(
       ApplicationSubmissionContext submissionContext) throws IOException {
-    RMApp application = testCreateAppSubmitted(submissionContext);
+    RMApp application = testCreateAppSubmittedNoRecovery(submissionContext);
   // SUBMITTED => ACCEPTED event RMAppEventType.APP_ACCEPTED
     RMAppEvent event = 
         new RMAppEvent(application.getApplicationId(), 
@@ -375,7 +404,13 @@ public class TestRMAppTransitions {
         application.getDiagnostics().indexOf(diagMsg) != -1);
   }
 
-  @Test
+  @Test (timeout = 30000)
+  public void testAppRecoverPath() throws IOException {
+    LOG.info("--- START: testAppRecoverPath ---");
+    testCreateAppSubmittedRecovery(null);
+  }
+
+  @Test (timeout = 30000)
   public void testAppNewKill() throws IOException {
     LOG.info("--- START: testAppNewKill ---");
 
@@ -402,11 +437,38 @@ public class TestRMAppTransitions {
     assertFailed(application, rejectedText);
   }
 
-  @Test
+  @Test (timeout = 30000)
+  public void testAppNewSavingKill() throws IOException {
+    LOG.info("--- START: testAppNewSavingKill ---");
+
+    RMApp application = testCreateAppNewSaving(null);
+    // NEW_SAVING => KILLED event RMAppEventType.KILL
+    RMAppEvent event =
+        new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+    application.handle(event);
+    rmDispatcher.await();
+    assertKilled(application);
+  }
+
+  @Test (timeout = 30000)
+  public void testAppNewSavingReject() throws IOException {
+    LOG.info("--- START: testAppNewSavingReject ---");
+
+    RMApp application = testCreateAppNewSaving(null);
+    // NEW_SAVING => FAILED event RMAppEventType.APP_REJECTED
+    String rejectedText = "Test Application Rejected";
+    RMAppEvent event =
+        new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
+    application.handle(event);
+    rmDispatcher.await();
+    assertFailed(application, rejectedText);
+  }
+
+  @Test (timeout = 30000)
   public void testAppSubmittedRejected() throws IOException {
     LOG.info("--- START: testAppSubmittedRejected ---");
 
-    RMApp application = testCreateAppSubmitted(null);
+    RMApp application = testCreateAppSubmittedNoRecovery(null);
     // SUBMITTED => FAILED event RMAppEventType.APP_REJECTED
     String rejectedText = "app rejected";
     RMAppEvent event = 
@@ -419,7 +481,7 @@ public class TestRMAppTransitions {
   @Test
   public void testAppSubmittedKill() throws IOException, InterruptedException {
     LOG.info("--- START: testAppSubmittedKill---");
-    RMApp application = testCreateAppSubmitted(null);
+    RMApp application = testCreateAppSubmittedNoRecovery(null);
     // SUBMITTED => KILLED event RMAppEventType.KILL
     RMAppEvent event = new RMAppEvent(application.getApplicationId(),
         RMAppEventType.KILL);
@@ -570,7 +632,37 @@ public class TestRMAppTransitions {
         "", diag.toString());
   }
 
-  @Test
+  @Test (timeout = 30000)
+  public void testAppFailedFailed() throws IOException {
+    LOG.info("--- START: testAppFailedFailed ---");
+
+    RMApp application = testCreateAppNewSaving(null);
+
+    // NEW_SAVING => FAILED event RMAppEventType.APP_REJECTED
+    RMAppEvent event =
+        new RMAppRejectedEvent(application.getApplicationId(), "");
+    application.handle(event);
+    rmDispatcher.await();
+    assertTimesAtFinish(application);
+    assertAppState(RMAppState.FAILED, application);
+
+    // FAILED => FAILED event RMAppEventType.KILL
+    event =
+        new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+    application.handle(event);
+    rmDispatcher.await();
+    assertTimesAtFinish(application);
+    assertAppState(RMAppState.FAILED, application);
+
+    // FAILED => FAILED event RMAppEventType.APP_SAVED
+    event = new RMAppStoredEvent(application.getApplicationId(), null);
+    application.handle(event);
+    rmDispatcher.await();
+    assertTimesAtFinish(application);
+    assertAppState(RMAppState.FAILED, application);
+  }
+
+  @Test (timeout = 30000)
   public void testAppKilledKilled() throws IOException {
     LOG.info("--- START: testAppKilledKilled ---");
 
@@ -616,6 +708,13 @@ public class TestRMAppTransitions {
     rmDispatcher.await();
     assertTimesAtFinish(application);
     assertAppState(RMAppState.KILLED, application);
+
+    // KILLED => KILLED event RMAppEventType.APP_SAVED
+    event = new RMAppStoredEvent(application.getApplicationId(), null);
+    application.handle(event);
+    rmDispatcher.await();
+    assertTimesAtFinish(application);
+    assertAppState(RMAppState.KILLED, application);
   }
 
   @Test

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java

@@ -68,7 +68,7 @@ public class FakeSchedulable extends Schedulable {
   }
   
   @Override
-  public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
+  public Resource assignContainer(FSSchedulerNode node) {
     return null;
   }
 

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java

@@ -24,7 +24,7 @@ import java.util.List;
 import junit.framework.Assert;
 
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -33,12 +33,12 @@ import org.junit.Test;
  */
 public class TestComputeFairShares {
   private List<Schedulable> scheds;
-  private SchedulingMode schedulingMode;
+  private SchedulingPolicy schedulingMode;
   
   @Before
   public void setUp() throws Exception {
     scheds = new ArrayList<Schedulable>();
-    schedulingMode = new FairSchedulingMode();
+    schedulingMode = new FairSharePolicy();
   }
   
   /** 

+ 93 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -72,7 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -283,7 +284,7 @@ public class TestFairScheduler {
     assertEquals(capacity / 4, queue2.getFairShare().getMemory());
     assertEquals(capacity / 4, queue3.getFairShare().getMemory());
   }
-  
+
   @Test
   public void testHierarchicalQueuesSimilarParents() {
     QueueManager queueManager = scheduler.getQueueManager();
@@ -1358,7 +1359,7 @@ public class TestFairScheduler {
     FSSchedulerApp app2 = scheduler.applications.get(attId2);
     
     FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1");
-    queue1.setSchedulingMode(new FifoSchedulingMode());
+    queue1.setPolicy(new FifoPolicy());
     
     scheduler.update();
 
@@ -1380,7 +1381,80 @@ public class TestFairScheduler {
     assertEquals(2, app1.getLiveContainers().size());
     assertEquals(1, app2.getLiveContainers().size());
   }
-  
+
+  /**
+   * Test to verify the behavior of
+   * {@link FSQueue#assignContainer(FSSchedulerNode)})
+   * 
+   * Create two queues under root (fifoQueue and fairParent), and two queues
+   * under fairParent (fairChild1 and fairChild2). Submit two apps to the
+   * fifoQueue and one each to the fairChild* queues, all apps requiring 4
+   * containers each of the total 16 container capacity
+   * 
+   * Assert the number of containers for each app after 4, 8, 12 and 16 updates.
+   * 
+   * @throws Exception
+   */
+  @Test(timeout = 5000)
+  public void testAssignContainer() throws Exception {
+    final String user = "user1";
+    final String fifoQueue = "fifo";
+    final String fairParent = "fairParent";
+    final String fairChild1 = fairParent + ".fairChild1";
+    final String fairChild2 = fairParent + ".fairChild2";
+
+    RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(8192));
+    RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(8192));
+
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+
+    scheduler.handle(nodeEvent1);
+    scheduler.handle(nodeEvent2);
+
+    ApplicationAttemptId attId1 =
+        createSchedulingRequest(1024, fifoQueue, user, 4);
+    ApplicationAttemptId attId2 =
+        createSchedulingRequest(1024, fairChild1, user, 4);
+    ApplicationAttemptId attId3 =
+        createSchedulingRequest(1024, fairChild2, user, 4);
+    ApplicationAttemptId attId4 =
+        createSchedulingRequest(1024, fifoQueue, user, 4);
+
+    FSSchedulerApp app1 = scheduler.applications.get(attId1);
+    FSSchedulerApp app2 = scheduler.applications.get(attId2);
+    FSSchedulerApp app3 = scheduler.applications.get(attId3);
+    FSSchedulerApp app4 = scheduler.applications.get(attId4);
+
+    scheduler.getQueueManager().getLeafQueue(fifoQueue)
+        .setPolicy(SchedulingPolicy.parse("fifo"));
+    scheduler.update();
+
+    NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1);
+    NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
+
+    for (int i = 0; i < 8; i++) {
+      scheduler.handle(updateEvent1);
+      scheduler.handle(updateEvent2);
+      if ((i + 1) % 2 == 0) {
+        // 4 node updates: fifoQueue should have received 2, and fairChild*
+        // should have received one each
+        String ERR =
+            "Wrong number of assigned containers after " + (i + 1) + " updates";
+        if (i < 4) {
+          // app1 req still not met
+          assertEquals(ERR, (i + 1), app1.getLiveContainers().size());
+          assertEquals(ERR, 0, app4.getLiveContainers().size());
+        } else {
+          // app1 req has been met, app4 should be served now
+          assertEquals(ERR, 4, app1.getLiveContainers().size());
+          assertEquals(ERR, (i - 3), app4.getLiveContainers().size());
+        }
+        assertEquals(ERR, (i + 1) / 2, app2.getLiveContainers().size());
+        assertEquals(ERR, (i + 1) / 2, app3.getLiveContainers().size());
+      }
+    }
+  }
   
   @SuppressWarnings("unchecked")
   @Test
@@ -1411,6 +1485,7 @@ public class TestFairScheduler {
     ContainerLaunchContext clc =
         BuilderUtils.newContainerLaunchContext(user, null, null, null, null,
             null, null);
+    submissionContext.setApplicationId(applicationId);
     submissionContext.setAMContainerSpec(clc);
     RMApp application =
         new RMAppImpl(applicationId, resourceManager.getRMContext(), conf, name, user, 
@@ -1419,13 +1494,24 @@ public class TestFairScheduler {
     resourceManager.getRMContext().getRMApps().putIfAbsent(applicationId, application);
     application.handle(new RMAppEvent(applicationId, RMAppEventType.START));
 
+    final int MAX_TRIES=20;
+    int numTries = 0;
+    while (!application.getState().equals(RMAppState.SUBMITTED) &&
+        numTries < MAX_TRIES) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ex) {ex.printStackTrace();}
+      numTries++;
+    }
+    assertEquals("The application doesn't reach SUBMITTED.",
+        RMAppState.SUBMITTED, application.getState());
+
     ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
     attId.setAttemptId(this.ATTEMPT_ID++);
     attId.setApplicationId(applicationId);
     scheduler.addApplication(attId, queue, user);
-    
-    final int MAX_TRIES=20;
-    int numTries = 0;
+
+    numTries = 0;
     while (application.getFinishTime() == 0 && numTries < MAX_TRIES) {
       try {
         Thread.sleep(100);

+ 0 - 59
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java

@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
-import org.junit.Test;
-
-public class TestSchedulingMode {
-
-  @Test(timeout = 1000)
-  public void testParseSchedulingMode() throws AllocationConfigurationException {
-
-    // Class name
-    SchedulingMode sm = SchedulingMode
-        .parse(FairSchedulingMode.class.getName());
-    assertTrue("Invalid scheduler name",
-        sm.getName().equals(FairSchedulingMode.NAME));
-
-    // Canonical name
-    sm = SchedulingMode.parse(FairSchedulingMode.class
-        .getCanonicalName());
-    assertTrue("Invalid scheduler name",
-        sm.getName().equals(FairSchedulingMode.NAME));
-
-    // Class
-    sm = SchedulingMode.getInstance(FairSchedulingMode.class);
-    assertTrue("Invalid scheduler name",
-        sm.getName().equals(FairSchedulingMode.NAME));
-
-    // Shortname - fair
-    sm = SchedulingMode.parse("fair");
-    assertTrue("Invalid scheduler name",
-        sm.getName().equals(FairSchedulingMode.NAME));
-
-    // Shortname - fifo
-    sm = SchedulingMode.parse("fifo");
-    assertTrue("Invalid scheduler name",
-        sm.getName().equals(FifoSchedulingMode.NAME));
-  }
-}

+ 109 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java

@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestSchedulingPolicy {
+
+  @Test(timeout = 1000)
+  public void testParseSchedulingPolicy()
+      throws AllocationConfigurationException {
+
+    // Class name
+    SchedulingPolicy sm = SchedulingPolicy
+        .parse(FairSharePolicy.class.getName());
+    assertTrue("Invalid scheduler name",
+        sm.getName().equals(FairSharePolicy.NAME));
+
+    // Canonical name
+    sm = SchedulingPolicy.parse(FairSharePolicy.class
+        .getCanonicalName());
+    assertTrue("Invalid scheduler name",
+        sm.getName().equals(FairSharePolicy.NAME));
+
+    // Class
+    sm = SchedulingPolicy.getInstance(FairSharePolicy.class);
+    assertTrue("Invalid scheduler name",
+        sm.getName().equals(FairSharePolicy.NAME));
+
+    // Shortname - fair
+    sm = SchedulingPolicy.parse("fair");
+    assertTrue("Invalid scheduler name",
+        sm.getName().equals(FairSharePolicy.NAME));
+
+    // Shortname - fifo
+    sm = SchedulingPolicy.parse("fifo");
+    assertTrue("Invalid scheduler name",
+        sm.getName().equals(FifoPolicy.NAME));
+  }
+
+  /**
+   * Trivial tests that make sure
+   * {@link SchedulingPolicy#isApplicableTo(SchedulingPolicy, byte)} works as
+   * expected for the possible values of depth
+   * 
+   * @throws AllocationConfigurationException
+   */
+  @Test(timeout = 1000)
+  public void testIsApplicableTo() throws AllocationConfigurationException {
+    final String ERR = "Broken SchedulingPolicy#isApplicableTo";
+    
+    // fifo
+    SchedulingPolicy policy = SchedulingPolicy.parse("fifo");
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF));
+    assertFalse(ERR, SchedulingPolicy.isApplicableTo(
+        SchedulingPolicy.parse("fifo"), SchedulingPolicy.DEPTH_INTERMEDIATE));
+    assertFalse(ERR, SchedulingPolicy.isApplicableTo(
+        SchedulingPolicy.parse("fifo"), SchedulingPolicy.DEPTH_ROOT));
+
+    
+    // fair
+    policy = SchedulingPolicy.parse("fair"); 
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF));
+    assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy,
+        SchedulingPolicy.DEPTH_INTERMEDIATE));
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT));
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
+
+    policy = Mockito.mock(SchedulingPolicy.class);
+    Mockito.when(policy.getApplicableDepth()).thenReturn(
+        SchedulingPolicy.DEPTH_PARENT);
+    assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy,
+        SchedulingPolicy.DEPTH_INTERMEDIATE));
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT));
+    assertTrue(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT));
+    assertFalse(ERR,
+        SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
+  }
+}

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/index.apt.vm

@@ -47,6 +47,8 @@ MapReduce NextGen aka YARN aka MRv2
 
   * {{{./CapacityScheduler.html}Capacity Scheduler}}
 
+  * {{{./FairScheduler.html}Fair Scheduler}}
+
   * {{{./WebApplicationProxy.html}Web Application Proxy}}
 
   * {{{../../hadoop-project-dist/hadoop-common/CLIMiniCluster.html}CLI MiniCluster}}