Forráskód Böngészése

YARN-2022 Preempting an Application Master container can be kept as least priority when multiple applications are marked for preemption by ProportionalCapacityPreemptionPolicy (Sunil G via mayank)
svn merge --ignore-ancestry -c 1607227 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1609615 13f79535-47bb-0310-9956-ffa450edef68

Zhijie Shen 11 éve
szülő
commit
9cc4e93fcd
8 módosított fájl, 278 hozzáadás és 15 törlés
  1. 77 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
  2. 5 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
  3. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
  4. 21 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
  5. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  6. 38 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
  7. 115 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
  8. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java

+ 77 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java

@@ -111,7 +111,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   public static final String NATURAL_TERMINATION_FACTOR =
       "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
 
-  //the dispatcher to send preempt and kill events
+  // the dispatcher to send preempt and kill events
   public EventHandler<ContainerPreemptEvent> dispatcher;
 
   private final Clock clock;
@@ -437,8 +437,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
       List<TempQueue> queues, Resource clusterResource) {
 
-    Map<ApplicationAttemptId,Set<RMContainer>> list =
+    Map<ApplicationAttemptId,Set<RMContainer>> preemptMap =
         new HashMap<ApplicationAttemptId,Set<RMContainer>>();
+    List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
 
     for (TempQueue qT : queues) {
       // we act only if we are violating balance by more than
@@ -449,26 +450,83 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
         // accounts for natural termination of containers
         Resource resToObtain =
           Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
+        Resource skippedAMSize = Resource.newInstance(0, 0);
 
         // lock the leafqueue while we scan applications and unreserve
-        synchronized(qT.leafQueue) {
-          NavigableSet<FiCaSchedulerApp> ns =
-            (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
+        synchronized (qT.leafQueue) {
+          NavigableSet<FiCaSchedulerApp> ns = 
+              (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
           Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
           qT.actuallyPreempted = Resources.clone(resToObtain);
           while (desc.hasNext()) {
             FiCaSchedulerApp fc = desc.next();
-            if (Resources.lessThanOrEqual(rc, clusterResource,
-                resToObtain, Resources.none())) {
+            if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
+                Resources.none())) {
               break;
             }
-            list.put(fc.getApplicationAttemptId(),
-            preemptFrom(fc, clusterResource, resToObtain));
+            preemptMap.put(
+                fc.getApplicationAttemptId(),
+                preemptFrom(fc, clusterResource, resToObtain,
+                    skippedAMContainerlist, skippedAMSize));
           }
+          Resource maxAMCapacityForThisQueue = Resources.multiply(
+              Resources.multiply(clusterResource,
+                  qT.leafQueue.getAbsoluteCapacity()),
+              qT.leafQueue.getMaxAMResourcePerQueuePercent());
+
+          // Can try preempting AMContainers (still saving atmost
+          // maxAMCapacityForThisQueue AMResource's) if more resources are
+          // required to be preempted from this Queue.
+          preemptAMContainers(clusterResource, preemptMap,
+              skippedAMContainerlist, resToObtain, skippedAMSize,
+              maxAMCapacityForThisQueue);
         }
       }
     }
-    return list;
+    return preemptMap;
+  }
+
+  /**
+   * As more resources are needed for preemption, saved AMContainers has to be
+   * rescanned. Such AMContainers can be preempted based on resToObtain, but 
+   * maxAMCapacityForThisQueue resources will be still retained.
+   *  
+   * @param clusterResource
+   * @param preemptMap
+   * @param skippedAMContainerlist
+   * @param resToObtain
+   * @param skippedAMSize
+   * @param maxAMCapacityForThisQueue
+   */
+  private void preemptAMContainers(Resource clusterResource,
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      List<RMContainer> skippedAMContainerlist, Resource resToObtain,
+      Resource skippedAMSize, Resource maxAMCapacityForThisQueue) {
+    for (RMContainer c : skippedAMContainerlist) {
+      // Got required amount of resources for preemption, can stop now
+      if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
+          Resources.none())) {
+        break;
+      }
+      // Once skippedAMSize reaches down to maxAMCapacityForThisQueue,
+      // container selection iteration for preemption will be stopped. 
+      if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize,
+          maxAMCapacityForThisQueue)) {
+        break;
+      }
+      Set<RMContainer> contToPrempt = preemptMap.get(c
+          .getApplicationAttemptId());
+      if (null == contToPrempt) {
+        contToPrempt = new HashSet<RMContainer>();
+        preemptMap.put(c.getApplicationAttemptId(), contToPrempt);
+      }
+      contToPrempt.add(c);
+      
+      Resources.subtractFrom(resToObtain, c.getContainer().getResource());
+      Resources.subtractFrom(skippedAMSize, c.getContainer()
+          .getResource());
+    }
+    skippedAMContainerlist.clear();
   }
 
   /**
@@ -480,8 +538,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @param rsrcPreempt
    * @return
    */
-  private Set<RMContainer> preemptFrom(
-      FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) {
+  private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
+      Resource clusterResource, Resource rsrcPreempt,
+      List<RMContainer> skippedAMContainerlist, Resource skippedAMSize) {
     Set<RMContainer> ret = new HashSet<RMContainer>();
     ApplicationAttemptId appId = app.getApplicationAttemptId();
 
@@ -513,6 +572,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
             rsrcPreempt, Resources.none())) {
         return ret;
       }
+      // Skip AM Container from preemption for now.
+      if (c.isAMContainer()) {
+        skippedAMContainerlist.add(c);
+        Resources.addTo(skippedAMSize, c.getContainer().getResource());
+        continue;
+      }
       ret.add(c);
       Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
     }

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -850,7 +851,10 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
 
       // Set the masterContainer
       appAttempt.setMasterContainer(amContainerAllocation.getContainers()
-        .get(0));
+          .get(0));
+      RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler
+          .getRMContainer(appAttempt.getMasterContainer().getId());
+      rmMasterContainer.setAMContainer(true);
       // The node set in NMTokenSecrentManager is used for marking whether the
       // NMToken has been issued for this node to the AM.
       // When AM container was allocated to RM itself, the node which allocates

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java

@@ -71,5 +71,7 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
   ContainerState getContainerState();
   
   ContainerReport createContainerReport();
+  
+  boolean isAMContainer();
 
 }

+ 21 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java

@@ -157,6 +157,7 @@ public class RMContainerImpl implements RMContainer {
   private long creationTime;
   private long finishTime;
   private ContainerStatus finishedStatus;
+  private boolean isAMContainer;
 
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -178,6 +179,7 @@ public class RMContainerImpl implements RMContainer {
     this.rmContext = rmContext;
     this.eventHandler = rmContext.getDispatcher().getEventHandler();
     this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
+    this.isAMContainer = false;
     
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
@@ -315,6 +317,25 @@ public class RMContainerImpl implements RMContainer {
     return containerId.toString();
   }
   
+  @Override
+  public boolean isAMContainer() {
+    try {
+      readLock.lock();
+      return isAMContainer;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public void setAMContainer(boolean isAMContainer) {
+    try {
+      writeLock.lock();
+      this.isAMContainer = isAMContainer;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+  
   @Override
   public void handle(RMContainerEvent event) {
     LOG.debug("Processing " + event.getContainerId() + " of type " + event.getType());
@@ -512,5 +533,4 @@ public class RMContainerImpl implements RMContainer {
     }
     return containerReport;
   }
-
 }

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
@@ -242,6 +243,20 @@ public abstract class AbstractYarnScheduler
 
       // recover scheduler attempt
       schedulerAttempt.recoverContainer(rmContainer);
+            
+      // set master container for the current running AMContainer for this
+      // attempt.
+      RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt();
+      if (appAttempt != null) {
+        Container masterContainer = appAttempt.getMasterContainer();
+
+        // Mark current running AMContainer's RMContainer based on the master
+        // container ID stored in AppAttempt.
+        if (masterContainer != null
+            && masterContainer.getId().equals(rmContainer.getContainerId())) {
+          ((RMContainerImpl)rmContainer).setAMContainer(true);
+        }
+      }
     }
   }
 

+ 38 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

@@ -62,6 +62,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -564,6 +565,43 @@ public class TestWorkPreservingRMRestart {
     rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
     rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
   }
+  
+  @Test (timeout = 30000)
+  public void testAMContainerStatusWithRMRestart() throws Exception {  
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    rm1 = new MockRM(conf, memStore);
+    rm1.start();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
+    nm1.registerNode();
+    RMApp app1_1 = rm1.submitApp(1024);
+    MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
+    
+    RMAppAttempt attempt0 = app1_1.getCurrentAppAttempt();
+    AbstractYarnScheduler scheduler =
+        ((AbstractYarnScheduler) rm1.getResourceScheduler());
+    
+    Assert.assertTrue(scheduler.getRMContainer(
+        attempt0.getMasterContainer().getId()).isAMContainer());
+
+    // Re-start RM
+    rm2 = new MockRM(conf, memStore);
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+    List<NMContainerStatus> am1_1Containers =
+        createNMContainerStatusForApp(am1_1);
+    nm1.registerNode(am1_1Containers, null);
+
+    // Wait for RM to settle down on recovering containers;
+    waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId());
+
+    scheduler = ((AbstractYarnScheduler) rm2.getResourceScheduler());
+    Assert.assertTrue(scheduler.getRMContainer(
+        attempt0.getMasterContainer().getId()).isAMContainer());
+  }
+
 
   private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
       int appsPending, int appsRunning, int appsCompleted,

+ 115 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java

@@ -80,6 +80,8 @@ public class TestProportionalCapacityPreemptionPolicy {
   static final long TS = 3141592653L;
 
   int appAlloc = 0;
+  boolean setAMContainer = false;
+  float setAMResourcePercent = 0.0f;
   Random rand = null;
   Clock mClock = null;
   Configuration conf = null;
@@ -466,7 +468,108 @@ public class TestProportionalCapacityPreemptionPolicy {
     
     fail("Failed to find SchedulingMonitor service, please check what happened");
   }
+  
+  @Test
+  public void testSkipAMContainer() {
+    int[][] qData = new int[][] {
+        //  /   A   B
+        { 100, 50, 50 }, // abs
+        { 100, 100, 100 }, // maxcap
+        { 100, 100, 0 }, // used
+        { 70, 20, 50 }, // pending
+        { 0, 0, 0 }, // reserved
+        { 5, 4, 1 }, // apps
+        { -1, 1, 1 }, // req granularity
+        { 2, 0, 0 }, // subqueues
+    };
+    setAMContainer = true;
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    
+    // By skipping AM Container, all other 24 containers of appD will be
+    // preempted
+    verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+    // By skipping AM Container, all other 24 containers of appC will be
+    // preempted
+    verify(mDisp, times(24)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+    // Since AM containers of appC and appD are saved, 2 containers from appB
+    // has to be preempted.
+    verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
+    setAMContainer = false;
+  }
+  
+  @Test
+  public void testPreemptSkippedAMContainers() {
+    int[][] qData = new int[][] {
+        //  /   A   B
+        { 100, 10, 90 }, // abs
+        { 100, 100, 100 }, // maxcap
+        { 100, 100, 0 }, // used
+        { 70, 20, 90 }, // pending
+        { 0, 0, 0 }, // reserved
+        { 5, 4, 1 }, // apps
+        { -1, 5, 5 }, // req granularity
+        { 2, 0, 0 }, // subqueues
+    };
+    setAMContainer = true;
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    
+    // All 5 containers of appD will be preempted including AM container.
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD)));
 
+    // All 5 containers of appC will be preempted including AM container.
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    
+    // By skipping AM Container, all other 4 containers of appB will be
+    // preempted
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
+
+    // By skipping AM Container, all other 4 containers of appA will be
+    // preempted
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    setAMContainer = false;
+  }
+  
+  @Test
+  public void testAMResourcePercentForSkippedAMContainers() {
+    int[][] qData = new int[][] {
+        //  /   A   B
+        { 100, 10, 90 }, // abs
+        { 100, 100, 100 }, // maxcap
+        { 100, 100, 0 }, // used
+        { 70, 20, 90 }, // pending
+        { 0, 0, 0 }, // reserved
+        { 5, 4, 1 }, // apps
+        { -1, 5, 5 }, // req granularity
+        { 2, 0, 0 }, // subqueues
+    };
+    setAMContainer = true;
+    setAMResourcePercent = 0.5f;
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    
+    // AMResoucePercent is 50% of cluster and maxAMCapacity will be 5Gb.
+    // Total used AM container size is 20GB, hence 2 AM container has
+    // to be preempted as Queue Capacity is 10Gb.
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+    // Including AM Container, all other 4 containers of appC will be
+    // preempted
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appC)));
+    
+    // By skipping AM Container, all other 4 containers of appB will be
+    // preempted
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
+
+    // By skipping AM Container, all other 4 containers of appA will be
+    // preempted
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    setAMContainer = false;
+  }
+  
   static class IsPreemptionRequestFor
       extends ArgumentMatcher<ContainerPreemptEvent> {
     private final ApplicationAttemptId appAttId;
@@ -583,6 +686,9 @@ public class TestProportionalCapacityPreemptionPolicy {
       }
     }
     when(lq.getApplications()).thenReturn(qApps);
+    if(setAMResourcePercent != 0.0f){
+      when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent);
+    }
     p.getChildQueues().add(lq);
     return lq;
   }
@@ -607,7 +713,11 @@ public class TestProportionalCapacityPreemptionPolicy {
 
     List<RMContainer> cLive = new ArrayList<RMContainer>();
     for (int i = 0; i < used; i += gran) {
-      cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
+      if(setAMContainer && i == 0){
+        cLive.add(mockContainer(appAttId, cAlloc, unit, 0));
+      }else{
+        cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
+      }
       ++cAlloc;
     }
     when(app.getLiveContainers()).thenReturn(cLive);
@@ -623,6 +733,10 @@ public class TestProportionalCapacityPreemptionPolicy {
     RMContainer mC = mock(RMContainer.class);
     when(mC.getContainerId()).thenReturn(cId);
     when(mC.getContainer()).thenReturn(c);
+    when(mC.getApplicationAttemptId()).thenReturn(appAttId);
+    if(0 == priority){
+      when(mC.isAMContainer()).thenReturn(true);
+    }
     return mC;
   }
 

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java

@@ -86,6 +86,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -600,6 +602,9 @@ public class TestRMAppAttemptTransitions {
             any(List.class), 
             any(List.class))).
     thenReturn(allocation);
+    RMContainer rmContainer = mock(RMContainerImpl.class);
+    when(scheduler.getRMContainer(container.getId())).
+        thenReturn(rmContainer);
     
     applicationAttempt.handle(
         new RMAppAttemptContainerAllocatedEvent(