Browse Source

YARN-1368. Added core functionality of recovering container state into schedulers after ResourceManager Restart so as to preserve running work in the cluster. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1601303 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 11 years ago
parent
commit
424fd9494f
34 changed files with 1128 additions and 132 deletions
  1. 4 0
      hadoop-yarn-project/CHANGES.txt
  2. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  4. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
  5. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  6. 8 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  7. 9 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  8. 24 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  9. 30 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
  10. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
  11. 37 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
  12. 37 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java
  13. 12 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  14. 38 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
  15. 95 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  16. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  17. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
  18. 21 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  19. 13 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
  20. 0 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
  21. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  22. 16 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  23. 8 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
  24. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
  25. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java
  26. 1 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
  27. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
  28. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
  29. 38 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  30. 33 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  31. 9 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
  32. 17 21
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java
  33. 10 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
  34. 570 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -32,6 +32,10 @@ Release 2.5.0 - UNRELEASED
     YARN-1338. Recover localized resource cache state upon nodemanager restart 
     (Jason Lowe via junping_du)
 
+    YARN-1368. Added core functionality of recovering container state into
+    schedulers after ResourceManager Restart so as to preserve running work in
+    the cluster. (Jian He via vinodkv)
+
   IMPROVEMENTS
 
     YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -318,6 +318,13 @@ public class YarnConfiguration extends Configuration {
   public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
   public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
 
+  @Private
+  public static final String RM_WORK_PRESERVING_RECOVERY_ENABLED = RM_PREFIX
+      + "work-preserving-recovery.enabled";
+  @Private
+  public static final boolean DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED =
+      false;
+
   /** Zookeeper interaction configs */
   public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-";
 

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -269,6 +269,14 @@
     <value>false</value>
   </property>
 
+  <property>
+    <description>Enable RM work preserving recovery. This configuration is private
+    to YARN for experimenting the feature.
+    </description>
+    <name>yarn.resourcemanager.work-preserving-recovery.enabled</name>
+    <value>false</value>
+  </property>
+
   <property>
     <description>The class to use as the persistent store.
 

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java

@@ -99,4 +99,6 @@ public interface RMContext {
       RMApplicationHistoryWriter rmApplicationHistoryWriter);
 
   ConfigurationProvider getConfigurationProvider();
+
+  boolean isWorkPreservingRecoveryEnabled();
 }

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -60,6 +60,7 @@ public class RMContextImpl implements RMContext {
     = new ConcurrentHashMap<String, RMNode>();
 
   private boolean isHAEnabled;
+  private boolean isWorkPreservingRecoveryEnabled;
   private HAServiceState haServiceState =
       HAServiceProtocol.HAServiceState.INITIALIZING;
   
@@ -329,6 +330,15 @@ public class RMContextImpl implements RMContext {
     }
   }
 
+  public void setWorkPreservingRecoveryEnabled(boolean enabled) {
+    this.isWorkPreservingRecoveryEnabled = enabled;
+  }
+
+  @Override
+  public boolean isWorkPreservingRecoveryEnabled() {
+    return this.isWorkPreservingRecoveryEnabled;
+  }
+
   @Override
   public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
     return rmApplicationHistoryWriter;

+ 8 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -364,9 +364,15 @@ public class ResourceManager extends CompositeService implements Recoverable {
           YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
 
       RMStateStore rmStore = null;
-      if(isRecoveryEnabled) {
+      if (isRecoveryEnabled) {
         recoveryEnabled = true;
-        rmStore =  RMStateStoreFactory.getStore(conf);
+        rmStore = RMStateStoreFactory.getStore(conf);
+        boolean isWorkPreservingRecoveryEnabled =
+            conf.getBoolean(
+              YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
+              YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
+        rmContext
+          .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
       } else {
         recoveryEnabled = false;
         rmStore = new NullRMStateStore();

+ 9 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -243,11 +244,13 @@ public class ResourceTrackerService extends AbstractService implements
     Resource capability = request.getResource();
     String nodeManagerVersion = request.getNMVersion();
 
-    if (!request.getNMContainerStatuses().isEmpty()) {
-      LOG.info("received container statuses on node manager register :"
-          + request.getNMContainerStatuses());
-      for (NMContainerStatus report : request.getNMContainerStatuses()) {
-        handleNMContainerStatus(report);
+    if (!rmContext.isWorkPreservingRecoveryEnabled()) {
+      if (!request.getNMContainerStatuses().isEmpty()) {
+        LOG.info("received container statuses on node manager register :"
+            + request.getNMContainerStatuses());
+        for (NMContainerStatus status : request.getNMContainerStatuses()) {
+          handleNMContainerStatus(status);
+        }
       }
     }
     RegisterNodeManagerResponse response = recordFactory
@@ -308,7 +311,7 @@ public class ResourceTrackerService extends AbstractService implements
     RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
     if (oldNode == null) {
       this.rmContext.getDispatcher().getEventHandler().handle(
-          new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
+          new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses()));
     } else {
       LOG.info("Reconnect from the node at: " + host);
       this.nmLivelinessMonitor.unregister(nodeId);

+ 24 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -723,29 +723,36 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
   }
 
+  // synchronously recover attempt to ensure any incoming external events
+  // to be processed after the attempt processes the recover event.
+  private void recoverAppAttempts() {
+    for (RMAppAttempt attempt : getAppAttempts().values()) {
+      attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(),
+        RMAppAttemptEventType.RECOVER));
+    }
+  }
+
   private static final class RMAppRecoveredTransition implements
       MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
 
-      for (RMAppAttempt attempt : app.getAppAttempts().values()) {
-        // synchronously recover attempt to ensure any incoming external events
-        // to be processed after the attempt processes the recover event.
-        attempt.handle(
-          new RMAppAttemptEvent(attempt.getAppAttemptId(),
-            RMAppAttemptEventType.RECOVER));
-      }
-
       // The app has completed.
       if (app.recoveredFinalState != null) {
+        app.recoverAppAttempts();
         new FinalTransition(app.recoveredFinalState).transition(app, event);
         return app.recoveredFinalState;
       }
 
-      // Last attempt is in final state, do not add to scheduler and just return
-      // ACCEPTED waiting for last RMAppAttempt to send finished or failed event
-      // back.
+      // Notify scheduler about the app on recovery
+      new AddApplicationToSchedulerTransition().transition(app, event);
+
+      // recover attempts
+      app.recoverAppAttempts();
+
+      // Last attempt is in final state, return ACCEPTED waiting for last
+      // RMAppAttempt to send finished or failed event back.
       if (app.currentAttempt != null
           && (app.currentAttempt.getState() == RMAppAttemptState.KILLED
               || app.currentAttempt.getState() == RMAppAttemptState.FINISHED
@@ -754,9 +761,6 @@ public class RMAppImpl implements RMApp, Recoverable {
         return RMAppState.ACCEPTED;
       }
 
-      // Notify scheduler about the app on recovery
-      new AddApplicationToSchedulerTransition().transition(app, event);
-
       // No existent attempts means the attempt associated with this app was not
       // started or started but not yet saved.
       if (app.attempts.isEmpty()) {
@@ -1055,8 +1059,12 @@ public class RMAppImpl implements RMApp, Recoverable {
       if (app.finishTime == 0 ) {
         app.finishTime = System.currentTimeMillis();
       }
-      app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
-        finalState));
+      // Recovered apps that are completed were not added to scheduler, so no
+      // need to remove them from scheduler.
+      if (app.recoveredFinalState == null) {
+        app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
+          finalState));
+      }
       app.handler.handle(
           new RMAppManagerEvent(app.applicationId,
           RMAppManagerEventType.APP_COMPLETED));

+ 30 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -267,15 +267,17 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.CONTAINER_FINISHED,
           new FinalSavingTransition(
-            new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
+            new AMContainerCrashedBeforeRunningTransition(), RMAppAttemptState.FAILED))
 
        // Transitions from LAUNCHED State
       .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
           RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
-      .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
+      .addTransition(RMAppAttemptState.LAUNCHED,
+          EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING),
           RMAppAttemptEventType.CONTAINER_FINISHED,
-          new FinalSavingTransition(
-            new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
+          new ContainerFinishedTransition(
+            new AMContainerCrashedBeforeRunningTransition(),
+            RMAppAttemptState.LAUNCHED))
       .addTransition(
           RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.EXPIRE,
@@ -302,7 +304,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
           RMAppAttemptState.RUNNING,
           EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
           RMAppAttemptEventType.CONTAINER_FINISHED,
-          new ContainerFinishedTransition())
+          new ContainerFinishedTransition(
+            new AMContainerCrashedAtRunningTransition(),
+            RMAppAttemptState.RUNNING))
       .addTransition(
           RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
           RMAppAttemptEventType.EXPIRE,
@@ -904,6 +908,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
         }
         return appAttempt.recoveredFinalState;
       } else {
+        // Add the current attempt to the scheduler.
+        if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) {
+          appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
+            appAttempt.getAppAttemptId(), false));
+        }
+
         /*
          * Since the application attempt's final state is not saved that means
          * for AM container (previous attempt) state must be one of these.
@@ -1207,17 +1217,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     }
   }
 
-  private static final class AMContainerCrashedTransition extends
+  private static final class AMContainerCrashedBeforeRunningTransition extends
       BaseFinalTransition {
 
-    public AMContainerCrashedTransition() {
+    public AMContainerCrashedBeforeRunningTransition() {
       super(RMAppAttemptState.FAILED);
     }
 
     @Override
     public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-
       RMAppAttemptContainerFinishedEvent finishEvent =
           ((RMAppAttemptContainerFinishedEvent)event);
 
@@ -1410,6 +1419,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       implements
       MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
 
+    // The transition To Do after attempt final state is saved.
+    private BaseTransition transitionToDo;
+    private RMAppAttemptState currentState;
+
+    public ContainerFinishedTransition(BaseTransition transitionToDo,
+        RMAppAttemptState currentState) {
+      this.transitionToDo = transitionToDo;
+      this.currentState = currentState;
+    }
+
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
@@ -1426,14 +1445,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
             containerStatus.getContainerId())) {
         // Remember the follow up transition and save the final attempt state.
         appAttempt.rememberTargetTransitionsAndStoreState(event,
-          new ContainerFinishedFinalStateSavedTransition(),
-          RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
+          transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
         return RMAppAttemptState.FINAL_SAVING;
       }
 
       // Normal container.Put it in completedcontainers list
       appAttempt.justFinishedContainers.add(containerStatus);
-      return RMAppAttemptState.RUNNING;
+      return this.currentState;
     }
   }
 
@@ -1451,7 +1469,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     }
   }
 
-  private static class ContainerFinishedFinalStateSavedTransition extends
+  private static class AMContainerCrashedAtRunningTransition extends
       BaseTransition {
     @Override
     public void

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java

@@ -33,5 +33,7 @@ public enum RMContainerEventType {
   RELEASED,
 
   // Source: ContainerAllocationExpirer  
-  EXPIRE
+  EXPIRE,
+
+  RECOVER
 }

+ 37 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java

@@ -35,12 +35,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -65,6 +67,9 @@ public class RMContainerImpl implements RMContainer {
         RMContainerEventType.KILL)
     .addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
         RMContainerEventType.RESERVED, new ContainerReservedTransition())
+    .addTransition(RMContainerState.NEW,
+        EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
+        RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
 
     // Transitions from RESERVED state
     .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, 
@@ -341,6 +346,38 @@ public class RMContainerImpl implements RMContainer {
     }
   }
 
+  private static final class ContainerRecoveredTransition
+      implements
+      MultipleArcTransition<RMContainerImpl, RMContainerEvent, RMContainerState> {
+    @Override
+    public RMContainerState transition(RMContainerImpl container,
+        RMContainerEvent event) {
+      NMContainerStatus report =
+          ((RMContainerRecoverEvent) event).getContainerReport();
+      if (report.getContainerState().equals(ContainerState.COMPLETE)) {
+        ContainerStatus status =
+            ContainerStatus.newInstance(report.getContainerId(),
+              report.getContainerState(), report.getDiagnostics(),
+              report.getContainerExitStatus());
+
+        new FinishedTransition().transition(container,
+          new RMContainerFinishedEvent(container.containerId, status,
+            RMContainerEventType.FINISHED));
+        return RMContainerState.COMPLETED;
+      } else if (report.getContainerState().equals(ContainerState.RUNNING)) {
+        // Tell the appAttempt
+        container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
+            container.getApplicationAttemptId(), container.getContainer()));
+        return RMContainerState.RUNNING;
+      } else {
+        // This can never happen.
+        LOG.warn("RMContainer received unexpected recover event with container"
+            + " state " + report.getContainerState() + " while recovering.");
+        return RMContainerState.RUNNING;
+      }
+    }
+  }
+
   private static final class ContainerReservedTransition extends
   BaseTransition {
 
@@ -398,7 +435,6 @@ public class RMContainerImpl implements RMContainer {
       // Inform AppAttempt
       container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
           container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
-
       container.rmContext.getRMApplicationHistoryWriter()
           .containerFinished(container);
     }

+ 37 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+
+public class RMContainerRecoverEvent extends RMContainerEvent {
+
+  private final NMContainerStatus containerReport;
+
+  public RMContainerRecoverEvent(ContainerId containerId,
+      NMContainerStatus containerReport) {
+    super(containerId, RMContainerEventType.RECOVER);
+    this.containerReport = containerReport;
+  }
+
+  public NMContainerStatus getContainerReport() {
+    return containerReport;
+  }
+}

+ 12 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
@@ -460,13 +461,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
       // Inform the scheduler
+      RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
+      List<NMContainerStatus> containers = null;
 
-      rmNode.context.getDispatcher().getEventHandler().handle(
-          new NodeAddedSchedulerEvent(rmNode));
-      rmNode.context.getDispatcher().getEventHandler().handle(
-          new NodesListManagerEvent(
-              NodesListManagerEventType.NODE_USABLE, rmNode));
- 
       String host = rmNode.nodeId.getHost();
       if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
         // Old node rejoining
@@ -476,10 +473,17 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       } else {
         // Increment activeNodes explicitly because this is a new node.
         ClusterMetrics.getMetrics().incrNumActiveNodes();
+        containers = startEvent.getContainerRecoveryReports();
       }
+
+      rmNode.context.getDispatcher().getEventHandler()
+        .handle(new NodeAddedSchedulerEvent(rmNode, containers));
+      rmNode.context.getDispatcher().getEventHandler().handle(
+        new NodesListManagerEvent(
+            NodesListManagerEventType.NODE_USABLE, rmNode));
     }
   }
-  
+
   public static class ReconnectNodeTransition implements
       SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
@@ -513,7 +517,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         }
         rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
         rmNode.context.getDispatcher().getEventHandler().handle(
-            new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
+            new RMNodeStartedEvent(newNode.getNodeID(), null));
       }
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodesListManagerEvent(

+ 38 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+
+public class RMNodeStartedEvent extends RMNodeEvent {
+
+  private List<NMContainerStatus> containerReports;
+
+  public RMNodeStartedEvent(NodeId nodeId, List<NMContainerStatus> containerReports) {
+    super(nodeId, RMNodeEventType.STARTED);
+    this.containerReports = containerReports;
+  }
+
+  public List<NMContainerStatus> getContainerRecoveryReports() {
+    return this.containerReports;
+  }
+}

+ 95 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -32,14 +32,21 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+@SuppressWarnings("unchecked")
 public abstract class AbstractYarnScheduler
     <T extends SchedulerApplicationAttempt, N extends SchedulerNode>
     extends AbstractService implements ResourceScheduler {
@@ -47,8 +54,7 @@ public abstract class AbstractYarnScheduler
   private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
 
   // Nodes in the cluster, indexed by NodeId
-  protected Map<NodeId, N> nodes =
-      new ConcurrentHashMap<NodeId, N>();
+  protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>();
 
   // Whole capacity of the cluster
   protected Resource clusterResource = Resource.newInstance(0, 0);
@@ -58,6 +64,7 @@ public abstract class AbstractYarnScheduler
 
   protected RMContext rmContext;
   protected Map<ApplicationId, SchedulerApplication<T>> applications;
+
   protected final static List<Container> EMPTY_CONTAINER_LIST =
       new ArrayList<Container>();
   protected static final Allocation EMPTY_ALLOCATION = new Allocation(
@@ -169,4 +176,90 @@ public abstract class AbstractYarnScheduler
     throw new YarnException(getClass().getSimpleName()
         + " does not support moving apps between queues");
   }
+
+  private void killOrphanContainerOnNode(RMNode node,
+      NMContainerStatus container) {
+    if (!container.getContainerState().equals(ContainerState.COMPLETE)) {
+      this.rmContext.getDispatcher().getEventHandler().handle(
+        new RMNodeCleanContainerEvent(node.getNodeID(),
+          container.getContainerId()));
+    }
+  }
+
+  public synchronized void recoverContainersOnNode(
+      List<NMContainerStatus> containerReports, RMNode nm) {
+    if (!rmContext.isWorkPreservingRecoveryEnabled()
+        || containerReports == null
+        || (containerReports != null && containerReports.isEmpty())) {
+      return;
+    }
+
+    for (NMContainerStatus container : containerReports) {
+      ApplicationId appId =
+          container.getContainerId().getApplicationAttemptId().getApplicationId();
+      RMApp rmApp = rmContext.getRMApps().get(appId);
+      if (rmApp == null) {
+        LOG.error("Skip recovering container " + container
+            + " for unknown application.");
+        killOrphanContainerOnNode(nm, container);
+        continue;
+      }
+
+      // Unmanaged AM recovery is addressed in YARN-1815
+      if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
+        LOG.info("Skip recovering container " + container + " for unmanaged AM."
+            + rmApp.getApplicationId());
+        killOrphanContainerOnNode(nm, container);
+        continue;
+      }
+
+      SchedulerApplication<T> schedulerApp = applications.get(appId);
+      if (schedulerApp == null) {
+        LOG.info("Skip recovering container  " + container
+            + " for unknown SchedulerApplication. Application current state is "
+            + rmApp.getState());
+        killOrphanContainerOnNode(nm, container);
+        continue;
+      }
+
+      LOG.info("Recovering container " + container);
+      SchedulerApplicationAttempt schedulerAttempt =
+          schedulerApp.getCurrentAppAttempt();
+
+      // create container
+      RMContainer rmContainer = recoverAndCreateContainer(container, nm);
+
+      // recover RMContainer
+      rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(),
+        container));
+
+      // recover scheduler node
+      nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
+
+      // recover queue: update headroom etc.
+      Queue queue = schedulerAttempt.getQueue();
+      queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
+
+      // recover scheduler attempt
+      schedulerAttempt.recoverContainer(rmContainer);
+    }
+  }
+
+  private RMContainer recoverAndCreateContainer(NMContainerStatus report,
+      RMNode node) {
+    Container container =
+        Container.newInstance(report.getContainerId(), node.getNodeID(),
+          node.getHttpAddress(), report.getAllocatedResource(),
+          report.getPriority(), null);
+    ApplicationAttemptId attemptId =
+        container.getId().getApplicationAttemptId();
+    RMContainer rmContainer =
+        new RMContainerImpl(container, attemptId, node.getNodeID(),
+          applications.get(attemptId.getApplicationId()).getUser(), rmContext);
+    return rmContainer;
+  }
+
+  public SchedulerNode getSchedulerNode(NodeId nodeId) {
+    return nodes.get(nodeId);
+  }
 }

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -409,4 +411,25 @@ public class AppSchedulingInfo {
     //    this.requests = appInfo.getRequests();
     this.blacklist = appInfo.getBlackList();
   }
+
+  public synchronized void recoverContainer(RMContainer rmContainer) {
+    // ContainerIdCounter on recovery will be addressed in YARN-2052
+    this.containerIdCounter.incrementAndGet();
+
+    QueueMetrics metrics = queue.getMetrics();
+    if (pending) {
+      // If there was any container to recover, the application was
+      // running from scheduler's POV.
+      pending = false;
+      metrics.runAppAttempt(applicationId, user);
+    }
+
+    // Container is completed. Skip recovering resources.
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
+
+    metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
+      false);
+  }
 }

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java

@@ -26,6 +26,8 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 
 @Evolving
 @LimitedPrivate("yarn")
@@ -60,4 +62,13 @@ public interface Queue {
   boolean hasAccess(QueueACL acl, UserGroupInformation user);
   
   public ActiveUsersManager getActiveUsersManager();
+
+  /**
+   * Recover the state of the queue for a given container.
+   * @param clusterResource the resource of the cluster
+   * @param schedulerAttempt the application for which the container was allocated
+   * @param rmContainer the container that was recovered.
+   */
+  public void recoverContainer(Resource clusterResource,
+      SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer);
 }

+ 21 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -534,5 +535,24 @@ public class SchedulerApplicationAttempt {
 
     appSchedulingInfo.move(newQueue);
     this.queue = newQueue;
-  }  
+  }
+
+  public synchronized void recoverContainer(RMContainer rmContainer) {
+    // recover app scheduling info
+    appSchedulingInfo.recoverContainer(rmContainer);
+
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
+    LOG.info("SchedulerAttempt " + getApplicationAttemptId()
+      + " is recovering container " + rmContainer.getContainerId());
+    liveContainers.put(rmContainer.getContainerId(), rmContainer);
+    Resources.addTo(currentConsumption, rmContainer.getContainer()
+      .getResource());
+    // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
+    // is called.
+    // newlyAllocatedContainers.add(rmContainer);
+    // schedulingOpportunities
+    // lastScheduledContainer
+  }
 }

+ 13 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -35,10 +34,10 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.common.base.Preconditions;
 
 /**
  * Represents a YARN Cluster Node from the viewpoint of the scheduler.
@@ -119,13 +118,10 @@ public abstract class SchedulerNode {
    * The Scheduler has allocated containers on this node to the given
    * application.
    * 
-   * @param applicationId
-   *          application
    * @param rmContainer
    *          allocated container
    */
-  public synchronized void allocateContainer(ApplicationId applicationId,
-      RMContainer rmContainer) {
+  public synchronized void allocateContainer(RMContainer rmContainer) {
     Container container = rmContainer.getContainer();
     deductAvailableResource(container.getResource());
     ++numContainers;
@@ -166,8 +162,8 @@ public abstract class SchedulerNode {
     return this.totalResourceCapability;
   }
 
-  private synchronized boolean isValidContainer(Container c) {
-    if (launchedContainers.containsKey(c.getId())) {
+  public synchronized boolean isValidContainer(ContainerId containerId) {
+    if (launchedContainers.containsKey(containerId)) {
       return true;
     }
     return false;
@@ -185,7 +181,7 @@ public abstract class SchedulerNode {
    *          container to be released
    */
   public synchronized void releaseContainer(Container container) {
-    if (!isValidContainer(container)) {
+    if (!isValidContainer(container.getId())) {
       LOG.error("Invalid container released " + container);
       return;
     }
@@ -274,4 +270,12 @@ public abstract class SchedulerNode {
     // we can only adjust available resource if total resource is changed.
     Resources.addTo(this.availableResource, deltaResource);
   }
+
+
+  public synchronized void recoverContainer(RMContainer rmContainer) {
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
+    allocateContainer(rmContainer);
+  }
 }

+ 0 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java

@@ -28,7 +28,6 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
@@ -234,15 +233,6 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    */
   public ActiveUsersManager getActiveUsersManager();
   
-  /**
-   * Recover the state of the queue
-   * @param clusterResource the resource of the cluster
-   * @param application the application for which the container was allocated
-   * @param container the container that was recovered.
-   */
-  public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application, 
-      Container container);
-  
   /**
    * Adds all applications in the queue and its subqueues to the given collection.
    * @param apps the collection to add the applications to

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -872,6 +872,8 @@ public class CapacityScheduler extends
     {
       NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
       addNode(nodeAddedEvent.getAddedRMNode());
+      recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
+        nodeAddedEvent.getAddedRMNode());
     }
     break;
     case NODE_REMOVED:

+ 16 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -59,15 +59,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.annotations.VisibleForTesting;
+
 @Private
 @Unstable
 public class LeafQueue implements CSQueue {
@@ -564,7 +566,8 @@ public class LeafQueue implements CSQueue {
         "numContainers=" + getNumContainers();  
   }
 
-  private synchronized User getUser(String userName) {
+  @VisibleForTesting
+  public synchronized User getUser(String userName) {
     User user = users.get(userName);
     if (user == null) {
       user = new User();
@@ -1346,8 +1349,7 @@ public class LeafQueue implements CSQueue {
       }
 
       // Inform the node
-      node.allocateContainer(application.getApplicationId(), 
-          allocatedContainer);
+      node.allocateContainer(allocatedContainer);
 
       LOG.info("assignedContainer" +
           " application attempt=" + application.getApplicationAttemptId() +
@@ -1446,7 +1448,7 @@ public class LeafQueue implements CSQueue {
   }
 
   synchronized void allocateResource(Resource clusterResource, 
-      FiCaSchedulerApp application, Resource resource) {
+      SchedulerApplicationAttempt application, Resource resource) {
     // Update queue metrics
     Resources.addTo(usedResources, resource);
     CSQueueUtils.updateQueueStatistics(
@@ -1530,7 +1532,8 @@ public class LeafQueue implements CSQueue {
     return metrics;
   }
 
-  static class User {
+  @VisibleForTesting
+  public static class User {
     Resource consumed = Resources.createResource(0, 0);
     int pendingApplications = 0;
     int activeApplications = 0;
@@ -1580,13 +1583,16 @@ public class LeafQueue implements CSQueue {
 
   @Override
   public void recoverContainer(Resource clusterResource,
-      FiCaSchedulerApp application, Container container) {
+      SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
     // Careful! Locking order is important! 
     synchronized (this) {
-      allocateResource(clusterResource, application, container.getResource());
+      allocateResource(clusterResource, attempt, rmContainer.getContainer()
+        .getResource());
     }
-    getParent().recoverContainer(clusterResource, application, container);
-
+    getParent().recoverContainer(clusterResource, attempt, rmContainer);
   }
 
   /**
@@ -1613,5 +1619,4 @@ public class LeafQueue implements CSQueue {
       apps.add(app.getApplicationAttemptId());
     }
   }
-
 }

+ 8 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -38,7 +38,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -49,9 +48,11 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -770,13 +771,16 @@ public class ParentQueue implements CSQueue {
   
   @Override
   public void recoverContainer(Resource clusterResource,
-      FiCaSchedulerApp application, Container container) {
+      SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
+    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+      return;
+    }
     // Careful! Locking order is important! 
     synchronized (this) {
-      allocateResource(clusterResource, container.getResource());
+      allocateResource(clusterResource,rmContainer.getContainer().getResource());
     }
     if (parent != null) {
-      parent.recoverContainer(clusterResource, application, container);
+      parent.recoverContainer(clusterResource, attempt, rmContainer);
     }
   }
 

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java

@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java

@@ -18,19 +18,34 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
 
+import java.util.List;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 public class NodeAddedSchedulerEvent extends SchedulerEvent {
 
   private final RMNode rmNode;
+  private final List<NMContainerStatus> containerReports;
 
   public NodeAddedSchedulerEvent(RMNode rmNode) {
     super(SchedulerEventType.NODE_ADDED);
     this.rmNode = rmNode;
+    this.containerReports = null;
+  }
+
+  public NodeAddedSchedulerEvent(RMNode rmNode,
+      List<NMContainerStatus> containerReports) {
+    super(SchedulerEventType.NODE_ADDED);
+    this.rmNode = rmNode;
+    this.containerReports = containerReports;
   }
 
   public RMNode getAddedRMNode() {
     return rmNode;
   }
 
+  public List<NMContainerStatus> getContainerReports() {
+    return containerReports;
+  }
 }

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java

@@ -264,8 +264,7 @@ public class AppSchedulable extends Schedulable {
       }
 
       // Inform the node
-      node.allocateContainer(app.getApplicationId(),
-          allocatedContainer);
+      node.allocateContainer(allocatedContainer);
 
       // If this container is used to run AM, update the leaf queue's AM usage
       if (app.getLiveContainers().size() == 1 &&

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 @Private
@@ -318,4 +319,10 @@ public class FSLeafQueue extends FSQueue {
       Resources.addTo(amResourceUsage, amResource);
     }
   }
+
+  @Override
+  public void recoverContainer(Resource clusterResource,
+      SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+    // TODO Auto-generated method stub
+  }
 }

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java

@@ -35,7 +35,9 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 
 @Private
 @Unstable
@@ -228,4 +230,11 @@ public class FSParentQueue extends FSQueue {
     // Should never be called since all applications are submitted to LeafQueues
     return null;
   }
+
+  @Override
+  public void recoverContainer(Resource clusterResource,
+      SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+    // TODO Auto-generated method stub
+    
+  }
 }

+ 38 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -178,6 +179,17 @@ public class FifoScheduler extends
     public ActiveUsersManager getActiveUsersManager() {
       return activeUsersManager;
     }
+
+    @Override
+    public void recoverContainer(Resource clusterResource,
+        SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+      if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+        return;
+      }
+      increaseUsedResources(rmContainer);
+      updateAppHeadRoom(schedulerAttempt);
+      updateAvailableResourcesMetrics();
+    }
   };
 
   public FifoScheduler() {
@@ -488,7 +500,7 @@ public class FifoScheduler extends
       if (attempt == null) {
         continue;
       }
-      attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
+      updateAppHeadRoom(attempt);
     }
   }
 
@@ -659,11 +671,10 @@ public class FifoScheduler extends
             application.allocate(type, node, priority, request, container);
         
         // Inform the node
-        node.allocateContainer(application.getApplicationId(), 
-            rmContainer);
+        node.allocateContainer(rmContainer);
 
         // Update usage for this container
-        Resources.addTo(usedResource, capability);
+        increaseUsedResources(rmContainer);
       }
 
     }
@@ -707,9 +718,22 @@ public class FifoScheduler extends
       LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
           + node.getAvailableResource());
     }
-    
-    metrics.setAvailableResourcesToQueue(
-        Resources.subtract(clusterResource, usedResource));
+
+    updateAvailableResourcesMetrics();
+  }
+
+  private void increaseUsedResources(RMContainer rmContainer) {
+    Resources.addTo(usedResource, rmContainer.getAllocatedResource());
+  }
+
+  private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) {
+    schedulerAttempt.setHeadroom(Resources.subtract(clusterResource,
+      usedResource));
+  }
+
+  private void updateAvailableResourcesMetrics() {
+    metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource,
+      usedResource));
   }
 
   @Override
@@ -719,6 +743,9 @@ public class FifoScheduler extends
     {
       NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
       addNode(nodeAddedEvent.getAddedRMNode());
+      recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
+        nodeAddedEvent.getAddedRMNode());
+
     }
     break;
     case NODE_REMOVED:
@@ -923,4 +950,8 @@ public class FifoScheduler extends
       return null;
     }
   }
+
+  public Resource getUsedResource() {
+    return usedResource;
+  }
 }

+ 33 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -21,10 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
+import java.util.List;
 import java.util.Map;
 
-import org.junit.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
@@ -47,12 +46,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -69,6 +70,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -76,6 +81,7 @@ import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.junit.Assert;
 
 @SuppressWarnings("unchecked")
 public class MockRM extends ResourceManager {
@@ -144,11 +150,26 @@ public class MockRM extends ResourceManager {
     }
   }
 
+  public void waitForContainerToComplete(RMAppAttempt attempt,
+      NMContainerStatus completedContainer) throws InterruptedException {
+    while (true) {
+      List<ContainerStatus> containers = attempt.getJustFinishedContainers();
+      System.out.println("Received completed containers " + containers);
+      for (ContainerStatus container : containers) {
+        if (container.getContainerId().equals(
+          completedContainer.getContainerId())) {
+          return;
+        }
+      }
+      Thread.sleep(200);
+    }
+  }
+
   public void waitForState(MockNM nm, ContainerId containerId,
       RMContainerState containerState) throws Exception {
     RMContainer container = getResourceScheduler().getRMContainer(containerId);
     int timeoutSecs = 0;
-    while(container == null && timeoutSecs++ < 20) {
+    while(container == null && timeoutSecs++ < 100) {
       nm.nodeHeartbeat(true);
       container = getResourceScheduler().getRMContainer(containerId);
       System.out.println("Waiting for container " + containerId + " to be allocated.");
@@ -333,7 +354,7 @@ public class MockRM extends ResourceManager {
   public void sendNodeStarted(MockNM nm) throws Exception {
     RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
         nm.getNodeId());
-    node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(nm.getNodeId(), null));
   }
   
   public void sendNodeLost(MockNM nm) throws Exception {
@@ -542,4 +563,12 @@ public class MockRM extends ResourceManager {
             .newInstance(appId));
     return response.getApplicationReport();
   }
+
+  // Explicitly reset queue metrics for testing.
+  @SuppressWarnings("static-access")
+  public void clearQueueMetrics(RMApp app) {
+    ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) getResourceScheduler())
+      .getSchedulerApplications().get(app.getApplicationId()).getQueue()
+      .getMetrics().clearQueueMetrics();
+  }
 }

+ 9 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java

@@ -18,16 +18,16 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -56,11 +57,10 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 public class TestFifoScheduler {
   private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
@@ -298,7 +298,10 @@ public class TestFifoScheduler {
     FifoScheduler fs = new FifoScheduler();
     fs.init(conf);
     fs.start();
+    // mock rmContext to avoid NPE.
+    RMContext context = mock(RMContext.class);
     fs.reinitialize(conf, null);
+    fs.setRMContext(context);
 
     RMNode n1 =
         MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2");

+ 17 - 21
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java

@@ -43,10 +43,11 @@ import org.junit.Test;
 public class TestMoveApplication {
   private ResourceManager resourceManager = null;
   private static boolean failMove;
-  
+  private Configuration conf;
+
   @Before
   public void setUp() throws Exception {
-    Configuration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class,
         FifoSchedulerWithMove.class);
     conf.set(YarnConfiguration.YARN_ADMIN_ACL, " ");
@@ -119,28 +120,23 @@ public class TestMoveApplication {
     }
   }
   
-  @Test (timeout = 5000)
-  public void testMoveSuccessful() throws Exception {
-    // Submit application
-    Application application = new Application("user1", resourceManager);
-    ApplicationId appId = application.getApplicationId();
-    application.submit();
-    
-    // Wait for app to be accepted
-    RMApp app = resourceManager.rmContext.getRMApps().get(appId);
-    while (app.getState() != RMAppState.ACCEPTED) {
-      Thread.sleep(100);
-    }
-
-    ClientRMService clientRMService = resourceManager.getClientRMService();
+  @Test (timeout = 10000)
+      public
+      void testMoveSuccessful() throws Exception {
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    RMApp app = rm1.submitApp(1024);
+    ClientRMService clientRMService = rm1.getClientRMService();
     // FIFO scheduler does not support moves
-    clientRMService.moveApplicationAcrossQueues(
-        MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue"));
-    
-    RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId);
+    clientRMService
+      .moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest
+        .newInstance(app.getApplicationId(), "newqueue"));
+
+    RMApp rmApp = rm1.getRMContext().getRMApps().get(app.getApplicationId());
     assertEquals("newqueue", rmApp.getQueue());
+    rm1.stop();
   }
-  
+
   @Test
   public void testMoveRejectedByPermissions() throws Exception {
     failMove = true;

+ 10 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -21,15 +21,14 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.junit.Assert;
-
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -160,7 +161,7 @@ public class TestRMNodeTransitions {
   @Test (timeout = 5000)
   public void testExpiredContainer() {
     // Start the node
-    node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(null, null));
     verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
     
     // Expire a container
@@ -188,11 +189,11 @@ public class TestRMNodeTransitions {
   @Test (timeout = 5000)
   public void testContainerUpdate() throws InterruptedException{
     //Start the node
-    node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(null, null));
     
     NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
     RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
-    node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    node2.handle(new RMNodeStartedEvent(null, null));
     
     ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
         BuilderUtils.newApplicationAttemptId(
@@ -248,7 +249,7 @@ public class TestRMNodeTransitions {
   @Test (timeout = 5000)
   public void testStatusChange(){
     //Start the node
-    node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(null, null));
     //Add info to the queue first
     node.setNextHeartBeat(false);
 
@@ -464,7 +465,7 @@ public class TestRMNodeTransitions {
     RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
         null, ResourceOption.newInstance(capability,
             RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
-    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
     Assert.assertEquals(NodeState.RUNNING, node.getState());
     return node;
   }
@@ -495,7 +496,7 @@ public class TestRMNodeTransitions {
     int initialUnhealthy = cm.getUnhealthyNMs();
     int initialDecommissioned = cm.getNumDecommisionedNMs();
     int initialRebooted = cm.getNumRebootedNMs();
-    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+    node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
     Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
     Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
     Assert.assertEquals("Unhealthy Nodes",

+ 570 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

@@ -0,0 +1,570 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+@RunWith(value = Parameterized.class)
+public class TestWorkPreservingRMRestart {
+
+  private YarnConfiguration conf;
+  private Class<?> schedulerClass;
+  MockRM rm1 = null;
+  MockRM rm2 = null;
+
+  @Before
+  public void setup() throws UnknownHostException {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    conf = new YarnConfiguration();
+    UserGroupInformation.setConfiguration(conf);
+    conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerClass,
+      ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+    DefaultMetricsSystem.setMiniClusterMode(true);
+  }
+
+  @After
+  public void tearDown() {
+    if (rm1 != null) {
+      rm1.stop();
+    }
+    if (rm2 != null) {
+      rm2.stop();
+    }
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> getTestParameters() {
+    return Arrays.asList(new Object[][] { { CapacityScheduler.class },
+        { FifoScheduler.class } });
+  }
+
+  public TestWorkPreservingRMRestart(Class<?> schedulerClass) {
+    this.schedulerClass = schedulerClass;
+  }
+
+  // Test common scheduler state including SchedulerAttempt, SchedulerNode,
+  // AppSchedulingInfo can be reconstructed via the container recovery reports
+  // on NM re-registration.
+  // Also test scheduler specific changes: i.e. Queue recovery-
+  // CSQueue/FSQueue/FifoQueue recovery respectively.
+  // Test Strategy: send 3 container recovery reports(AMContainer, running
+  // container, completed container) on NM re-registration, check the states of
+  // SchedulerAttempt, SchedulerNode etc. are updated accordingly.
+  @Test(timeout = 20000)
+  public void testSchedulerRecovery() throws Exception {
+    conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+    conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+      DominantResourceCalculator.class.getName());
+
+    int containerMemory = 1024;
+    Resource containerResource = Resource.newInstance(containerMemory, 1);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // clear queue metrics
+    rm1.clearQueueMetrics(app1);
+
+    // Re-start RM
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    // recover app
+    RMApp recoveredApp1 =
+        rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+    RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt();
+    NMContainerStatus amContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1,
+          ContainerState.RUNNING);
+    NMContainerStatus runningContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
+          ContainerState.RUNNING);
+    NMContainerStatus completedContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
+          ContainerState.COMPLETE);
+
+    nm1.registerNode(Arrays.asList(amContainer, runningContainer,
+      completedContainer));
+
+    // Wait for RM to settle down on recovering containers;
+    waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
+
+    // check RMContainers are re-recreated and the container state is correct.
+    rm2.waitForState(nm1, amContainer.getContainerId(),
+      RMContainerState.RUNNING);
+    rm2.waitForState(nm1, runningContainer.getContainerId(),
+      RMContainerState.RUNNING);
+    rm2.waitForContainerToComplete(loadedAttempt1, completedContainer);
+
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm2.getResourceScheduler();
+    SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId());
+
+    // ********* check scheduler node state.*******
+    // 2 running containers.
+    Resource usedResources = Resources.multiply(containerResource, 2);
+    Resource nmResource =
+        Resource.newInstance(nm1.getMemory(), nm1.getvCores());
+
+    assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
+    assertTrue(schedulerNode1.isValidContainer(runningContainer
+      .getContainerId()));
+    assertFalse(schedulerNode1.isValidContainer(completedContainer
+      .getContainerId()));
+    // 2 launched containers, 1 completed container
+    assertEquals(2, schedulerNode1.getNumContainers());
+
+    assertEquals(Resources.subtract(nmResource, usedResources),
+      schedulerNode1.getAvailableResource());
+    assertEquals(usedResources, schedulerNode1.getUsedResource());
+    Resource availableResources = Resources.subtract(nmResource, usedResources);
+
+    // ***** check queue state based on the underlying scheduler ********
+    Map<ApplicationId, SchedulerApplication> schedulerApps =
+        ((AbstractYarnScheduler) rm2.getResourceScheduler())
+          .getSchedulerApplications();
+    SchedulerApplication schedulerApp =
+        schedulerApps.get(recoveredApp1.getApplicationId());
+
+    if (schedulerClass.equals(CapacityScheduler.class)) {
+      checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
+    } else if (schedulerClass.equals(FifoScheduler.class)) {
+      checkFifoQueue(schedulerApp, usedResources, availableResources);
+    }
+
+    // *********** check scheduler attempt state.********
+    SchedulerApplicationAttempt schedulerAttempt =
+        schedulerApp.getCurrentAppAttempt();
+    assertTrue(schedulerAttempt.getLiveContainers().contains(
+      scheduler.getRMContainer(amContainer.getContainerId())));
+    assertTrue(schedulerAttempt.getLiveContainers().contains(
+      scheduler.getRMContainer(runningContainer.getContainerId())));
+    assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources);
+    assertEquals(availableResources, schedulerAttempt.getHeadroom());
+
+    // *********** check appSchedulingInfo state ***********
+    assertEquals(4, schedulerAttempt.getNewContainerId());
+  }
+
+  private void checkCSQueue(MockRM rm,
+      SchedulerApplication<SchedulerApplicationAttempt> app,
+      Resource clusterResource, Resource queueResource, Resource usedResource,
+      int numContainers)
+      throws Exception {
+    checkCSLeafQueue(rm2, app, clusterResource, queueResource, usedResource,
+      numContainers);
+
+    LeafQueue queue = (LeafQueue) app.getQueue();
+    Resource availableResources = Resources.subtract(queueResource, usedResource);
+    // ************* check Queue metrics ************
+    QueueMetrics queueMetrics = queue.getMetrics();
+    asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
+      availableResources.getVirtualCores(), usedResource.getMemory(),
+      usedResource.getVirtualCores());
+
+    // ************ check user metrics ***********
+    QueueMetrics userMetrics =
+        queueMetrics.getUserMetrics(app.getUser());
+    asserteMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
+      availableResources.getVirtualCores(), usedResource.getMemory(),
+      usedResource.getVirtualCores());
+  }
+
+  private void checkCSLeafQueue(MockRM rm,
+      SchedulerApplication<SchedulerApplicationAttempt> app,
+      Resource clusterResource, Resource queueResource, Resource usedResource,
+      int numContainers) {
+    LeafQueue leafQueue = (LeafQueue) app.getQueue();
+    // assert queue used resources.
+    assertEquals(usedResource, leafQueue.getUsedResources());
+    assertEquals(numContainers, leafQueue.getNumContainers());
+
+    ResourceCalculator calc =
+        ((CapacityScheduler) rm.getResourceScheduler()).getResourceCalculator();
+    float usedCapacity =
+        Resources.divide(calc, clusterResource, usedResource, queueResource);
+    // assert queue used capacity
+    assertEquals(usedCapacity, leafQueue.getUsedCapacity(), 1e-8);
+    float absoluteUsedCapacity =
+        Resources.divide(calc, clusterResource, usedResource, clusterResource);
+    // assert queue absolute capacity
+    assertEquals(absoluteUsedCapacity, leafQueue.getAbsoluteUsedCapacity(),
+      1e-8);
+    // assert user consumed resources.
+    assertEquals(usedResource, leafQueue.getUser(app.getUser())
+      .getConsumedResources());
+  }
+
+  private void checkFifoQueue(SchedulerApplication schedulerApp,
+      Resource usedResources, Resource availableResources) throws Exception {
+    FifoScheduler scheduler = (FifoScheduler) rm2.getResourceScheduler();
+    // ************ check cluster used Resources ********
+    assertEquals(usedResources, scheduler.getUsedResource());
+
+    // ************ check app headroom ****************
+    SchedulerApplicationAttempt schedulerAttempt =
+        schedulerApp.getCurrentAppAttempt();
+    assertEquals(availableResources, schedulerAttempt.getHeadroom());
+
+    // ************ check queue metrics ****************
+    QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
+    asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
+      availableResources.getVirtualCores(), usedResources.getMemory(),
+      usedResources.getVirtualCores());
+  }
+
+  // create 3 container reports for AM
+  public static List<NMContainerStatus>
+      createNMContainerStatusForApp(MockAM am) {
+    List<NMContainerStatus> list =
+        new ArrayList<NMContainerStatus>();
+    NMContainerStatus amContainer =
+        TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 1,
+          ContainerState.RUNNING);
+    NMContainerStatus runningContainer =
+        TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 2,
+          ContainerState.RUNNING);
+    NMContainerStatus completedContainer =
+        TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 3,
+          ContainerState.COMPLETE);
+    list.add(amContainer);
+    list.add(runningContainer);
+    list.add(completedContainer);
+    return list;
+  }
+
+  private static final String R = "Default";
+  private static final String A = "QueueA";
+  private static final String B = "QueueB";
+  private static final String USER_1 = "user1";
+  private static final String USER_2 = "user2";
+
+  private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R });
+    final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R;
+    conf.setCapacity(Q_R, 100);
+    final String Q_A = Q_R + "." + A;
+    final String Q_B = Q_R + "." + B;
+    conf.setQueues(Q_R, new String[] {A, B});
+    conf.setCapacity(Q_A, 50);
+    conf.setCapacity(Q_B, 50);
+    conf.setDouble(CapacitySchedulerConfiguration
+      .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f);
+  }
+
+  // Test CS recovery with multi-level queues and multi-users:
+  // 1. setup 2 NMs each with 8GB memory;
+  // 2. setup 2 level queues: Default -> (QueueA, QueueB)
+  // 3. User1 submits 2 apps on QueueA
+  // 4. User2 submits 1 app  on QueueB
+  // 5. AM and each container has 1GB memory
+  // 6. Restart RM.
+  // 7. nm1 re-syncs back containers belong to user1
+  // 8. nm2 re-syncs back containers belong to user2.
+  // 9. Assert the parent queue and 2 leaf queues state and the metrics.
+  // 10. Assert each user's consumption inside the queue.
+  @Test (timeout = 30000)
+  public void testCapacitySchedulerRecovery() throws Exception {
+    if (!schedulerClass.equals(CapacityScheduler.class)) {
+      return;
+    }
+    conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+    conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+      DominantResourceCalculator.class.getName());
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(conf);
+    setupQueueConfiguration(csConf);
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(csConf);
+    rm1 = new MockRM(csConf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    MockNM nm2 =
+        new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    nm2.registerNode();
+    RMApp app1_1 = rm1.submitApp(1024, "app1_1", USER_1, null, A);
+    MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
+    RMApp app1_2 = rm1.submitApp(1024, "app1_2", USER_1, null, A);
+    MockAM am1_2 = MockRM.launchAndRegisterAM(app1_2, rm1, nm2);
+
+    RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B);
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // clear queue metrics
+    rm1.clearQueueMetrics(app1_1);
+    rm1.clearQueueMetrics(app1_2);
+    rm1.clearQueueMetrics(app2);
+
+    // Re-start RM
+    rm2 = new MockRM(csConf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm2.setResourceTrackerService(rm2.getResourceTrackerService());
+
+    List<NMContainerStatus> am1_1Containers =
+        createNMContainerStatusForApp(am1_1);
+    List<NMContainerStatus> am1_2Containers =
+        createNMContainerStatusForApp(am1_2);
+    am1_1Containers.addAll(am1_2Containers);
+    nm1.registerNode(am1_1Containers);
+
+    List<NMContainerStatus> am2Containers =
+        createNMContainerStatusForApp(am2);
+    nm2.registerNode(am2Containers);
+
+    // Wait for RM to settle down on recovering containers;
+    waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId());
+    waitForNumContainersToRecover(2, rm2, am1_2.getApplicationAttemptId());
+    waitForNumContainersToRecover(2, rm2, am1_2.getApplicationAttemptId());
+
+    // Calculate each queue's resource usage.
+    Resource containerResource = Resource.newInstance(1024, 1);
+    Resource nmResource =
+        Resource.newInstance(nm1.getMemory(), nm1.getvCores());
+    Resource clusterResource = Resources.multiply(nmResource, 2);
+    Resource q1Resource = Resources.multiply(clusterResource, 0.5);
+    Resource q2Resource = Resources.multiply(clusterResource, 0.5);
+    Resource q1UsedResource = Resources.multiply(containerResource, 4);
+    Resource q2UsedResource = Resources.multiply(containerResource, 2);
+    Resource totalUsedResource = Resources.add(q1UsedResource, q2UsedResource);
+    Resource q1availableResources =
+        Resources.subtract(q1Resource, q1UsedResource);
+    Resource q2availableResources =
+        Resources.subtract(q2Resource, q2UsedResource);
+    Resource totalAvailableResource =
+        Resources.add(q1availableResources, q2availableResources);
+
+    Map<ApplicationId, SchedulerApplication> schedulerApps =
+        ((AbstractYarnScheduler) rm2.getResourceScheduler())
+          .getSchedulerApplications();
+    SchedulerApplication schedulerApp1_1 =
+        schedulerApps.get(app1_1.getApplicationId());
+
+    // assert queue A state.
+    checkCSLeafQueue(rm2, schedulerApp1_1, clusterResource, q1Resource,
+      q1UsedResource, 4);
+    QueueMetrics queue1Metrics = schedulerApp1_1.getQueue().getMetrics();
+    asserteMetrics(queue1Metrics, 2, 0, 2, 0, 4,
+      q1availableResources.getMemory(), q1availableResources.getVirtualCores(),
+      q1UsedResource.getMemory(), q1UsedResource.getVirtualCores());
+
+    // assert queue B state.
+    SchedulerApplication schedulerApp2 =
+        schedulerApps.get(app2.getApplicationId());
+    checkCSLeafQueue(rm2, schedulerApp2, clusterResource, q2Resource,
+      q2UsedResource, 2);
+    QueueMetrics queue2Metrics = schedulerApp2.getQueue().getMetrics();
+    asserteMetrics(queue2Metrics, 1, 0, 1, 0, 2,
+      q2availableResources.getMemory(), q2availableResources.getVirtualCores(),
+      q2UsedResource.getMemory(), q2UsedResource.getVirtualCores());
+
+    // assert parent queue state.
+    LeafQueue leafQueue = (LeafQueue) schedulerApp2.getQueue();
+    ParentQueue parentQueue = (ParentQueue) leafQueue.getParent();
+    checkParentQueue(parentQueue, 6, totalUsedResource, (float) 6 / 16,
+      (float) 6 / 16);
+    asserteMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6,
+      totalAvailableResource.getMemory(),
+      totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
+      totalUsedResource.getVirtualCores());
+  }
+
+  private void checkParentQueue(ParentQueue parentQueue, int numContainers,
+      Resource usedResource, float UsedCapacity, float absoluteUsedCapacity) {
+    assertEquals(numContainers, parentQueue.getNumContainers());
+    assertEquals(usedResource, parentQueue.getUsedResources());
+    assertEquals(UsedCapacity, parentQueue.getUsedCapacity(), 1e-8);
+    assertEquals(absoluteUsedCapacity, parentQueue.getAbsoluteUsedCapacity(), 1e-8);
+  }
+
+  // Test RM shuts down, in the meanwhile, AM fails. Restarted RM scheduler
+  // should not recover the containers that belong to the failed AM.
+  @Test(timeout = 20000)
+  public void testAMfailedBetweenRMRestart() throws Exception {
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+    NMContainerStatus amContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1,
+          ContainerState.COMPLETE);
+    NMContainerStatus runningContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
+          ContainerState.RUNNING);
+    NMContainerStatus completedContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
+          ContainerState.COMPLETE);
+    nm1.registerNode(Arrays.asList(amContainer, runningContainer,
+      completedContainer));
+    rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+    // Wait for RM to settle down on recovering containers;
+    Thread.sleep(3000);
+
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm2.getResourceScheduler();
+    // Previous AM failed, The failed AM should once again release the
+    // just-recovered containers.
+    assertNull(scheduler.getRMContainer(runningContainer.getContainerId()));
+    assertNull(scheduler.getRMContainer(completedContainer.getContainerId()));
+  }
+
+  // Apps already completed before RM restart. Restarted RM scheduler should not
+  // recover containers for completed apps.
+  @Test(timeout = 20000)
+  public void testContainersNotRecoveredForCompletedApps() throws Exception {
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1 = rm1.submitApp(200);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1);
+
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    NMContainerStatus runningContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
+          ContainerState.RUNNING);
+    NMContainerStatus completedContainer =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
+          ContainerState.COMPLETE);
+    nm1.registerNode(Arrays.asList(runningContainer, completedContainer));
+    RMApp recoveredApp1 =
+        rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+    assertEquals(RMAppState.FINISHED, recoveredApp1.getState());
+
+    // Wait for RM to settle down on recovering containers;
+    Thread.sleep(3000);
+
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm2.getResourceScheduler();
+
+    // scheduler should not recover containers for finished apps.
+    assertNull(scheduler.getRMContainer(runningContainer.getContainerId()));
+    assertNull(scheduler.getRMContainer(completedContainer.getContainerId()));
+  }
+
+  private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
+      int appsPending, int appsRunning, int appsCompleted,
+      int allocatedContainers, int availableMB, int availableVirtualCores,
+      int allocatedMB, int allocatedVirtualCores) {
+    assertEquals(appsSubmitted, qm.getAppsSubmitted());
+    assertEquals(appsPending, qm.getAppsPending());
+    assertEquals(appsRunning, qm.getAppsRunning());
+    assertEquals(appsCompleted, qm.getAppsCompleted());
+    assertEquals(allocatedContainers, qm.getAllocatedContainers());
+    assertEquals(availableMB, qm.getAvailableMB());
+    assertEquals(availableVirtualCores, qm.getAvailableVirtualCores());
+    assertEquals(allocatedMB, qm.getAllocatedMB());
+    assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores());
+  }
+
+  private void waitForNumContainersToRecover(int num, MockRM rm,
+      ApplicationAttemptId attemptId) throws Exception {
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+    SchedulerApplicationAttempt attempt =
+        scheduler.getApplicationAttempt(attemptId);
+    while (attempt == null) {
+      System.out.println("Wait for scheduler attempt " + attemptId
+          + " to be created");
+      Thread.sleep(200);
+      attempt = scheduler.getApplicationAttempt(attemptId);
+    }
+    while (attempt.getLiveContainers().size() < num) {
+      System.out.println("Wait for " + num + " containers to recover.");
+      Thread.sleep(200);
+    }
+  }
+}