瀏覽代碼

YARN-1408 Preemption caused Invalid State Event: ACQUIRED at KILLED and caused a task timeout for 30mins. (Sunil G via mayank)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1610860 13f79535-47bb-0310-9956-ffa450edef68
Mayank Bansal 10 年之前
父節點
當前提交
43589a8df7
共有 14 個文件被更改,包括 352 次插入24 次删除
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
  3. 27 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
  4. 22 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  5. 50 19
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  6. 8 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  7. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  8. 9 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
  9. 9 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
  10. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  11. 42 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
  12. 67 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
  13. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
  14. 86 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -259,6 +259,9 @@ Release 2.5.0 - UNRELEASED
     YARN-2241. ZKRMStateStore: On startup, show nicer messages if znodes already 
     exist. (Robert Kanter via kasha)
 
+	YARN-1408 Preemption caused Invalid State Event: ACQUIRED at KILLED and 
+	caused a task timeout for 30mins. (Sunil G via mayank)
+
   OPTIMIZATIONS
 
   BUG FIXES 

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
+import java.util.List;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 
 /**
@@ -73,5 +76,7 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
   ContainerReport createContainerReport();
   
   boolean isAMContainer();
+  
+  List<ResourceRequest> getResourceRequests();
 
 }

+ 27 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
 import java.util.EnumSet;
+import java.util.List;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -158,6 +160,7 @@ public class RMContainerImpl implements RMContainer {
   private long finishTime;
   private ContainerStatus finishedStatus;
   private boolean isAMContainer;
+  private List<ResourceRequest> resourceRequests;
 
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -180,7 +183,8 @@ public class RMContainerImpl implements RMContainer {
     this.eventHandler = rmContext.getDispatcher().getEventHandler();
     this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
     this.isAMContainer = false;
-    
+    this.resourceRequests = null;
+
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
@@ -311,6 +315,25 @@ public class RMContainerImpl implements RMContainer {
       readLock.unlock();
     }
   }
+  
+  @Override
+  public List<ResourceRequest> getResourceRequests() {
+    try {
+      readLock.lock();
+      return resourceRequests;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  public void setResourceRequests(List<ResourceRequest> requests) {
+    try {
+      writeLock.lock();
+      this.resourceRequests = requests;
+    } finally {
+      writeLock.unlock();
+    }
+  }
 
   @Override
   public String toString() {
@@ -432,6 +455,9 @@ public class RMContainerImpl implements RMContainer {
 
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
+      // Clear ResourceRequest stored in RMContainer
+      container.setResourceRequests(null);
+      
       // Register with containerAllocationExpirer.
       container.containerAllocationExpirer.register(container.getContainerId());
 

+ 22 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -275,6 +276,27 @@ public abstract class AbstractYarnScheduler
     return rmContainer;
   }
 
+  /**
+   * Recover resource request back from RMContainer when a container is 
+   * preempted before AM pulled the same. If container is pulled by
+   * AM, then RMContainer will not have resource request to recover.
+   * @param rmContainer
+   */
+  protected void recoverResourceRequestForContainer(RMContainer rmContainer) {
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+
+    // If container state is moved to ACQUIRED, request will be empty.
+    if (requests == null) {
+      return;
+    }
+    // Add resource request back to Scheduler.
+    SchedulerApplicationAttempt schedulerAttempt 
+        = getCurrentAttemptForContainer(rmContainer.getContainerId());
+    if (schedulerAttempt != null) {
+      schedulerAttempt.recoverResourceRequests(requests);
+    }
+  }
+
   public SchedulerNode getSchedulerNode(NodeId nodeId) {
     return nodes.get(nodeId);
   }

+ 50 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -127,9 +127,10 @@ public class AppSchedulingInfo {
    * by the application.
    *
    * @param requests resources to be acquired
+   * @param recoverPreemptedRequest recover Resource Request on preemption
    */
   synchronized public void updateResourceRequests(
-      List<ResourceRequest> requests) {
+      List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
     QueueMetrics metrics = queue.getMetrics();
     
     // Update resource requests
@@ -163,8 +164,13 @@ public class AppSchedulingInfo {
         asks = new HashMap<String, ResourceRequest>();
         this.requests.put(priority, asks);
         this.priorities.add(priority);
-      } else if (updatePendingResources) {
-        lastRequest = asks.get(resourceName);
+      }
+      lastRequest = asks.get(resourceName);
+
+      if (recoverPreemptedRequest && lastRequest != null) {
+        // Increment the number of containers to 1, as it is recovering a
+        // single container.
+        request.setNumContainers(lastRequest.getNumContainers() + 1);
       }
 
       asks.put(resourceName, request);
@@ -254,14 +260,16 @@ public class AppSchedulingInfo {
    * @param container
    *          the containers allocated.
    */
-  synchronized public void allocate(NodeType type, SchedulerNode node,
-      Priority priority, ResourceRequest request, Container container) {
+  synchronized public List<ResourceRequest> allocate(NodeType type,
+      SchedulerNode node, Priority priority, ResourceRequest request,
+      Container container) {
+    List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>();
     if (type == NodeType.NODE_LOCAL) {
-      allocateNodeLocal(node, priority, request, container);
+      allocateNodeLocal(node, priority, request, container, resourceRequests);
     } else if (type == NodeType.RACK_LOCAL) {
-      allocateRackLocal(node, priority, request, container);
+      allocateRackLocal(node, priority, request, container, resourceRequests);
     } else {
-      allocateOffSwitch(node, priority, request, container);
+      allocateOffSwitch(node, priority, request, container, resourceRequests);
     }
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
@@ -279,6 +287,7 @@ public class AppSchedulingInfo {
           + " resource=" + request.getCapability());
     }
     metrics.allocateResources(user, 1, request.getCapability(), true);
+    return resourceRequests;
   }
 
   /**
@@ -288,9 +297,9 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateNodeLocal( 
-      SchedulerNode node, Priority priority, 
-      ResourceRequest nodeLocalRequest, Container container) {
+  synchronized private void allocateNodeLocal(SchedulerNode node,
+      Priority priority, ResourceRequest nodeLocalRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
     if (nodeLocalRequest.getNumContainers() == 0) {
@@ -304,7 +313,14 @@ public class AppSchedulingInfo {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+    ResourceRequest offRackRequest = requests.get(priority).get(
+        ResourceRequest.ANY);
+    decrementOutstanding(offRackRequest);
+
+    // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
   }
 
   /**
@@ -314,16 +330,22 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateRackLocal(
-      SchedulerNode node, Priority priority,
-      ResourceRequest rackLocalRequest, Container container) {
+  synchronized private void allocateRackLocal(SchedulerNode node,
+      Priority priority, ResourceRequest rackLocalRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
     if (rackLocalRequest.getNumContainers() == 0) {
       this.requests.get(priority).remove(node.getRackName());
     }
 
-    decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+    ResourceRequest offRackRequest = requests.get(priority).get(
+        ResourceRequest.ANY);
+    decrementOutstanding(offRackRequest);
+
+    // Update cloned RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+    resourceRequests.add(cloneResourceRequest(offRackRequest));
   }
 
   /**
@@ -333,11 +355,13 @@ public class AppSchedulingInfo {
    * @param allocatedContainers
    *          resources allocated to the application
    */
-  synchronized private void allocateOffSwitch(
-      SchedulerNode node, Priority priority,
-      ResourceRequest offSwitchRequest, Container container) {
+  synchronized private void allocateOffSwitch(SchedulerNode node,
+      Priority priority, ResourceRequest offSwitchRequest, Container container,
+      List<ResourceRequest> resourceRequests) {
     // Update future requirements
     decrementOutstanding(offSwitchRequest);
+    // Update cloned RackLocal and OffRack requests for recovery
+    resourceRequests.add(cloneResourceRequest(offSwitchRequest));
   }
 
   synchronized private void decrementOutstanding(
@@ -436,4 +460,11 @@ public class AppSchedulingInfo {
     metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
       false);
   }
+  
+  public ResourceRequest cloneResourceRequest(ResourceRequest request) {
+    ResourceRequest newRequest = ResourceRequest.newInstance(
+        request.getPriority(), request.getResourceName(),
+        request.getCapability(), 1, request.getRelaxLocality());
+    return newRequest;
+  }
 }

+ 8 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -241,7 +241,14 @@ public class SchedulerApplicationAttempt {
   public synchronized void updateResourceRequests(
       List<ResourceRequest> requests) {
     if (!isStopped) {
-      appSchedulingInfo.updateResourceRequests(requests);
+      appSchedulingInfo.updateResourceRequests(requests, false);
+    }
+  }
+  
+  public synchronized void recoverResourceRequests(
+      List<ResourceRequest> requests) {
+    if (!isStopped) {
+      appSchedulingInfo.updateResourceRequests(requests, true);
     }
   }
   

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -1089,6 +1089,7 @@ public class CapacityScheduler extends
     if (LOG.isDebugEnabled()) {
       LOG.debug("KILL_CONTAINER: container" + cont.toString());
     }
+    recoverResourceRequestForContainer(cont);
     completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
       cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
       RMContainerEventType.KILL);

+ 9 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -77,6 +78,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     if (null == liveContainers.remove(rmContainer.getContainerId())) {
       return false;
     }
+    
+    // Remove from the list of newly allocated containers if found
+    newlyAllocatedContainers.remove(rmContainer);
 
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
@@ -129,8 +133,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     liveContainers.put(container.getId(), rmContainer);    
 
     // Update consumption and track allocations
-    appSchedulingInfo.allocate(type, node, priority, request, container);
+    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+        type, node, priority, request, container);
     Resources.addTo(currentConsumption, container.getResource());
+    
+    // Update resource requests related to "request" and store in RMContainer 
+    ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
 
     // Inform the container
     rmContainer.handle(

+ 9 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -82,6 +83,9 @@ public class FSSchedulerApp extends SchedulerApplicationAttempt {
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
     
+    // Remove from the list of newly allocated containers if found
+    newlyAllocatedContainers.remove(rmContainer);
+    
     // Inform the container
     rmContainer.handle(
         new RMContainerFinishedEvent(
@@ -281,9 +285,13 @@ public class FSSchedulerApp extends SchedulerApplicationAttempt {
     liveContainers.put(container.getId(), rmContainer);    
 
     // Update consumption and track allocations
-    appSchedulingInfo.allocate(type, node, priority, request, container);
+    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+        type, node, priority, request, container);
     Resources.addTo(currentConsumption, container.getResource());
 
+    // Update resource requests related to "request" and store in RMContainer
+    ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
+
     // Inform the container
     rmContainer.handle(
         new RMContainerEvent(container.getId(), RMContainerEventType.START));

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -422,7 +422,7 @@ public class FairScheduler extends
     }
   }
   
-  private void warnOrKillContainer(RMContainer container) {
+  protected void warnOrKillContainer(RMContainer container) {
     ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
     FSSchedulerApp app = getSchedulerApp(appAttemptId);
     FSLeafQueue queue = app.getQueue();
@@ -440,6 +440,7 @@ public class FairScheduler extends
           SchedulerUtils.createPreemptedContainerStatus(
             container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
 
+        recoverResourceRequestForContainer(container);
         // TODO: Not sure if this ever actually adds this to the list of cleanup
         // containers on the RMNode (see SchedulerNode.releaseContainer()).
         completedContainer(container, status, RMContainerEventType.KILL);

+ 42 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java

@@ -26,6 +26,9 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -36,17 +39,24 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -204,4 +214,36 @@ public class TestRMContainerImpl {
     assertEquals(RMContainerState.RUNNING, rmContainer.getState());
     verify(writer, never()).containerFinished(any(RMContainer.class));
   }
+  
+  @Test
+  public void testExistenceOfResourceRequestInRMContainer() throws Exception {
+    Configuration conf = new Configuration();
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
+    RMApp app1 = rm1.submitApp(1024);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    ResourceScheduler scheduler = rm1.getResourceScheduler();
+
+    // request a container.
+    am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
+    ContainerId containerId2 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+
+    // Verify whether list of ResourceRequest is present in RMContainer
+    // while moving to ALLOCATED state
+    Assert.assertNotNull(scheduler.getRMContainer(containerId2)
+        .getResourceRequests());
+
+    // Allocate container
+    am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>())
+        .getAllocatedContainers();
+    rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED);
+
+    // After RMContainer moving to ACQUIRED state, list of ResourceRequest will
+    // be empty
+    Assert.assertNull(scheduler.getRMContainer(containerId2)
+        .getResourceRequests());
+  }
 }

+ 67 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -79,6 +80,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -87,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -947,4 +951,67 @@ public class TestCapacityScheduler {
 
     rm1.stop();
   }
+  
+  @Test(timeout = 30000)
+  public void testRecoverRequestAfterPreemption() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm1 = new MockRM(conf);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
+    RMApp app1 = rm1.submitApp(1024);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    // request a container.
+    am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
+    ContainerId containerId1 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 2);
+    rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED);
+
+    RMContainer rmContainer = cs.getRMContainer(containerId1);
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+    FiCaSchedulerApp app = cs.getApplicationAttempt(am1
+        .getApplicationAttemptId());
+
+    FiCaSchedulerNode node = cs.getNode(rmContainer.getAllocatedNode());
+    for (ResourceRequest request : requests) {
+      // Skip the OffRack and RackLocal resource requests.
+      if (request.getResourceName().equals(node.getRackName())
+          || request.getResourceName().equals(ResourceRequest.ANY)) {
+        continue;
+      }
+
+      // Already the node local resource request is cleared from RM after
+      // allocation.
+      Assert.assertNull(app.getResourceRequest(request.getPriority(),
+          request.getResourceName()));
+    }
+
+    // Call killContainer to preempt the container
+    cs.killContainer(rmContainer);
+
+    Assert.assertEquals(3, requests.size());
+    for (ResourceRequest request : requests) {
+      // Resource request must have added back in RM after preempt event
+      // handling.
+      Assert.assertEquals(
+          1,
+          app.getResourceRequest(request.getPriority(),
+              request.getResourceName()).getNumContainers());
+    }
+
+    // New container will be allocated and will move to ALLOCATED state
+    ContainerId containerId2 = ContainerId.newInstance(
+        am1.getApplicationAttemptId(), 3);
+    rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+
+    // allocate container
+    List<Container> containers = am1.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+
+    // Now with updated ResourceRequest, a container is allocated for AM.
+    Assert.assertTrue(containers.size() == 1);
+  }
 }

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java

@@ -167,6 +167,27 @@ public class FairSchedulerTestBase {
         .put(id.getApplicationId(), rmApp);
     return id;
   }
+  
+  protected ApplicationAttemptId createSchedulingRequest(String queueId,
+      String userId, List<ResourceRequest> ask) {
+    ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
+        this.ATTEMPT_ID++);
+    scheduler.addApplication(id.getApplicationId(), queueId, userId);
+    // This conditional is for testAclSubmitApplication where app is rejected
+    // and no app is added.
+    if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
+      scheduler.addApplicationAttempt(id, false, true);
+    }
+    scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
+    RMApp rmApp = mock(RMApp.class);
+    RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
+    when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
+    when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
+        new RMAppAttemptMetrics(id));
+    resourceManager.getRMContext().getRMApps()
+        .put(id.getApplicationId(), rmApp);
+    return id;
+  }
 
   protected void createSchedulingRequestExistingApplication(
        int memory, int priority, ApplicationAttemptId attId) {

+ 86 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -53,10 +53,13 @@ import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
@@ -77,11 +80,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -2831,6 +2836,87 @@ public class TestFairScheduler extends FairSchedulerTestBase {
       }
     }
   }
+    
+  @Test(timeout=5000)
+  public void testRecoverRequestAfterPreemption() throws Exception {
+    conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
+    
+    MockClock clock = new MockClock();
+    scheduler.setClock(clock);
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    Priority priority = Priority.newInstance(20);
+    String host = "127.0.0.1";
+    int GB = 1024;
+
+    // Create Node and raised Node Added event
+    RMNode node = MockNodes.newNodeInfo(1,
+        Resources.createResource(16 * 1024, 4), 0, host);
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+
+    // Create 3 container requests and place it in ask
+    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+    ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host,
+        priority.getPriority(), 1, true);
+    ResourceRequest rackLocalRequest = createResourceRequest(GB, 1,
+        node.getRackName(), priority.getPriority(), 1, true);
+    ResourceRequest offRackRequest = createResourceRequest(GB, 1,
+        ResourceRequest.ANY, priority.getPriority(), 1, true);
+    ask.add(nodeLocalRequest);
+    ask.add(rackLocalRequest);
+    ask.add(offRackRequest);
+
+    // Create Request and update
+    ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA",
+        "user1", ask);
+    scheduler.update();
+
+    // Sufficient node check-ins to fully schedule containers
+    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(nodeUpdate);
+
+    assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
+        .size());
+    FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId);
+
+    // ResourceRequest will be empty once NodeUpdate is completed
+    Assert.assertNull(app.getResourceRequest(priority, host));
+
+    ContainerId containerId1 = ContainerId.newInstance(appAttemptId, 1);
+    RMContainer rmContainer = app.getRMContainer(containerId1);
+
+    // Create a preempt event and register for preemption
+    scheduler.warnOrKillContainer(rmContainer);
+    
+    // Wait for few clock ticks
+    clock.tick(5);
+    
+    // preempt now
+    scheduler.warnOrKillContainer(rmContainer);
+
+    List<ResourceRequest> requests = rmContainer.getResourceRequests();
+    // Once recovered, resource request will be present again in app
+    Assert.assertEquals(3, requests.size());
+    for (ResourceRequest request : requests) {
+      Assert.assertEquals(1,
+          app.getResourceRequest(priority, request.getResourceName())
+              .getNumContainers());
+    }
+
+    // Send node heartbeat
+    scheduler.update();
+    scheduler.handle(nodeUpdate);
+
+    List<Container> containers = scheduler.allocate(appAttemptId,
+        Collections.<ResourceRequest> emptyList(),
+        Collections.<ContainerId> emptyList(), null, null).getContainers();
+
+    // Now with updated ResourceRequest, a container is allocated for AM.
+    Assert.assertTrue(containers.size() == 1);
+  }
   
   @SuppressWarnings("resource")
   @Test