浏览代码

YARN-3508. Prevent processing preemption events on the main RM dispatcher. (Varun Saxena via wangda)

Wangda Tan 10 年之前
父节点
当前提交
a6f6ba95ef
共有 11 个文件被更改,包括 151 次插入100 次删除
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 0 34
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  3. 2 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
  4. 1 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
  5. 20 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
  6. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
  7. 0 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java
  8. 24 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  9. 6 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
  10. 79 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
  11. 12 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -15,6 +15,9 @@ Release 2.7.2 - UNRELEASED
     YARN-3793. Several NPEs when deleting local files on NM recovery (Varun
     Saxena via jlowe)
 
+    YARN-3508. Prevent processing preemption events on the main RM dispatcher. 
+    (Varun Saxena via wangda)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 0 - 34
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -610,9 +609,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
             YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
             SchedulingEditPolicy.class);
         if (policies.size() > 0) {
-          rmDispatcher.register(ContainerPreemptEventType.class,
-              new RMContainerPreemptEventDispatcher(
-                  (PreemptableResourceScheduler) scheduler));
           for (SchedulingEditPolicy policy : policies) {
             LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
             // periodically check whether we need to take action to guarantee
@@ -782,36 +778,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
     }
   }
 
-  @Private
-  public static final class
-    RMContainerPreemptEventDispatcher
-      implements EventHandler<ContainerPreemptEvent> {
-
-    private final PreemptableResourceScheduler scheduler;
-
-    public RMContainerPreemptEventDispatcher(
-        PreemptableResourceScheduler scheduler) {
-      this.scheduler = scheduler;
-    }
-
-    @Override
-    public void handle(ContainerPreemptEvent event) {
-      ApplicationAttemptId aid = event.getAppId();
-      RMContainer container = event.getContainer();
-      switch (event.getType()) {
-      case DROP_RESERVATION:
-        scheduler.dropContainerReservation(container);
-        break;
-      case PREEMPT_CONTAINER:
-        scheduler.preemptContainer(aid, container);
-        break;
-      case KILL_CONTAINER:
-        scheduler.killContainer(container);
-        break;
-      }
-    }
-  }
-
   @Private
   public static final class ApplicationAttemptEventDispatcher implements
       EventHandler<RMAppAttemptEvent> {

+ 2 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java

@@ -18,14 +18,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.monitor;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
 
 public interface SchedulingEditPolicy {
 
-  public void init(Configuration config,
-      EventHandler<ContainerPreemptEvent> dispatcher,
+  public void init(Configuration config, RMContext context,
       PreemptableResourceScheduler scheduler);
 
   /**

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java

@@ -54,9 +54,8 @@ public class SchedulingMonitor extends AbstractService {
     return scheduleEditPolicy;
   }
 
-  @SuppressWarnings("unchecked")
   public void serviceInit(Configuration conf) throws Exception {
-    scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(),
+    scheduleEditPolicy.init(conf, rmContext,
         (PreemptableResourceScheduler) rmContext.getScheduler());
     this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
     super.serviceInit(conf);

+ 20 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java

@@ -30,7 +30,6 @@ import java.util.NavigableSet;
 import java.util.PriorityQueue;
 import java.util.Set;
 
-import org.apache.commons.collections.map.HashedMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -38,19 +37,18 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -116,8 +114,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   public static final String NATURAL_TERMINATION_FACTOR =
       "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
 
-  // the dispatcher to send preempt and kill events
-  public EventHandler<ContainerPreemptEvent> dispatcher;
+  private RMContext rmContext;
 
   private final Clock clock;
   private double maxIgnoredOverCapacity;
@@ -137,20 +134,17 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   }
 
   public ProportionalCapacityPreemptionPolicy(Configuration config,
-      EventHandler<ContainerPreemptEvent> dispatcher,
-      CapacityScheduler scheduler) {
-    this(config, dispatcher, scheduler, new SystemClock());
+      RMContext context, CapacityScheduler scheduler) {
+    this(config, context, scheduler, new SystemClock());
   }
 
   public ProportionalCapacityPreemptionPolicy(Configuration config,
-      EventHandler<ContainerPreemptEvent> dispatcher,
-      CapacityScheduler scheduler, Clock clock) {
-    init(config, dispatcher, scheduler);
+      RMContext context, CapacityScheduler scheduler, Clock clock) {
+    init(config, context, scheduler);
     this.clock = clock;
   }
 
-  public void init(Configuration config,
-      EventHandler<ContainerPreemptEvent> disp,
+  public void init(Configuration config, RMContext context,
       PreemptableResourceScheduler sched) {
     LOG.info("Preemption monitor:" + this.getClass().getCanonicalName());
     assert null == scheduler : "Unexpected duplicate call to init";
@@ -159,7 +153,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
           sched.getClass().getCanonicalName() + " not instance of " +
           CapacityScheduler.class.getCanonicalName());
     }
-    dispatcher = disp;
+    rmContext = context;
     scheduler = (CapacityScheduler) sched;
     maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
     naturalTerminationFactor =
@@ -218,6 +212,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @param root the root of the CapacityScheduler queue hierarchy
    * @param clusterResources the total amount of resources in the cluster
    */
+  @SuppressWarnings("unchecked")
   private void containerBasedPreemptOrKill(CSQueue root,
       Resource clusterResources) {
 
@@ -252,18 +247,21 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // preempt (or kill) the selected containers
     for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
          : toPreempt.entrySet()) {
+      ApplicationAttemptId appAttemptId = e.getKey();
       for (RMContainer container : e.getValue()) {
         // if we tried to preempt this for more than maxWaitTime
         if (preempted.get(container) != null &&
             preempted.get(container) + maxWaitTime < clock.getTime()) {
           // kill it
-          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
-                ContainerPreemptEventType.KILL_CONTAINER));
+          rmContext.getDispatcher().getEventHandler().handle(
+              new ContainerPreemptEvent(appAttemptId, container,
+                  SchedulerEventType.KILL_CONTAINER));
           preempted.remove(container);
         } else {
           //otherwise just send preemption events
-          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
-                ContainerPreemptEventType.PREEMPT_CONTAINER));
+          rmContext.getDispatcher().getEventHandler().handle(
+              new ContainerPreemptEvent(appAttemptId, container,
+                  SchedulerEventType.PREEMPT_CONTAINER));
           if (preempted.get(container) == null) {
             preempted.put(container, clock.getTime());
           }
@@ -634,6 +632,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @param rsrcPreempt
    * @return Set<RMContainer> Set of RMContainers
    */
+  @SuppressWarnings("unchecked")
   private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
       Resource clusterResource, Resource rsrcPreempt,
       List<RMContainer> skippedAMContainerlist, Resource skippedAMSize) {
@@ -649,8 +648,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
         return ret;
       }
       if (!observeOnly) {
-        dispatcher.handle(new ContainerPreemptEvent(appId, c,
-            ContainerPreemptEventType.DROP_RESERVATION));
+        rmContext.getDispatcher().getEventHandler().handle(
+            new ContainerPreemptEvent(
+                appId, c, SchedulerEventType.DROP_RESERVATION));
       }
       Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
     }

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java

@@ -19,20 +19,20 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 
 /**
  * Simple event class used to communicate containers unreservations, preemption, killing
  */
-public class ContainerPreemptEvent
-    extends AbstractEvent<ContainerPreemptEventType> {
+public class ContainerPreemptEvent extends SchedulerEvent {
 
   private final ApplicationAttemptId aid;
   private final RMContainer container;
 
   public ContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container,
-      ContainerPreemptEventType type) {
+      SchedulerEventType type) {
     super(type);
     this.aid = aid;
     this.container = container;

+ 0 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java

@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-
-public enum ContainerPreemptEventType {
-
-  DROP_RESERVATION,
-  PREEMPT_CONTAINER,
-  KILL_CONTAINER
-
-}

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -84,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@@ -1212,6 +1213,29 @@ public class CapacityScheduler extends
           RMContainerEventType.EXPIRE);
     }
     break;
+    case DROP_RESERVATION:
+    {
+      ContainerPreemptEvent dropReservationEvent = (ContainerPreemptEvent)event;
+      RMContainer container = dropReservationEvent.getContainer();
+      dropContainerReservation(container);
+    }
+    break;
+    case PREEMPT_CONTAINER:
+    {
+      ContainerPreemptEvent preemptContainerEvent =
+          (ContainerPreemptEvent)event;
+      ApplicationAttemptId aid = preemptContainerEvent.getAppId();
+      RMContainer containerToBePreempted = preemptContainerEvent.getContainer();
+      preemptContainer(aid, containerToBePreempted);
+    }
+    break;
+    case KILL_CONTAINER:
+    {
+      ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event;
+      RMContainer containerToBeKilled = killContainerEvent.getContainer();
+      killContainer(containerToBeKilled);
+    }
+    break;
     default:
       LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
     }

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java

@@ -36,5 +36,10 @@ public enum SchedulerEventType {
   APP_ATTEMPT_REMOVED,
 
   // Source: ContainerAllocationExpirer
-  CONTAINER_EXPIRED
+  CONTAINER_EXPIRED,
+
+  // Source: SchedulingEditPolicy
+  DROP_RESERVATION,
+  PREEMPT_CONTAINER,
+  KILL_CONTAINER
 }

+ 79 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRMDispatcher {
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout=10000)
+  public void testSchedulerEventDispatcherForPreemptionEvents() {
+    AsyncDispatcher rmDispatcher = new AsyncDispatcher();
+    CapacityScheduler sched = spy(new CapacityScheduler());
+    YarnConfiguration conf = new YarnConfiguration();
+    SchedulerEventDispatcher schedulerDispatcher =
+        new SchedulerEventDispatcher(sched);
+    rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
+    rmDispatcher.init(conf);
+    rmDispatcher.start();
+    schedulerDispatcher.init(conf);
+    schedulerDispatcher.start();
+    try {
+      ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class);
+      RMContainer container = mock(RMContainer.class);
+      ContainerPreemptEvent event1 = new ContainerPreemptEvent(
+          appAttemptId, container, SchedulerEventType.DROP_RESERVATION);
+      rmDispatcher.getEventHandler().handle(event1);
+      ContainerPreemptEvent event2 = new ContainerPreemptEvent(
+           appAttemptId, container, SchedulerEventType.KILL_CONTAINER);
+      rmDispatcher.getEventHandler().handle(event2);
+      ContainerPreemptEvent event3 = new ContainerPreemptEvent(
+          appAttemptId, container, SchedulerEventType.PREEMPT_CONTAINER);
+      rmDispatcher.getEventHandler().handle(event3);
+      // Wait for events to be processed by scheduler dispatcher.
+      Thread.sleep(1000);
+      verify(sched, times(3)).handle(any(SchedulerEvent.class));
+      verify(sched).dropContainerReservation(container);
+      verify(sched).preemptContainer(appAttemptId, container);
+      verify(sched).killContainer(container);
+    } catch (InterruptedException e) {
+      Assert.fail();
+    } finally {
+      schedulerDispatcher.stop();
+      rmDispatcher.stop();
+    }
+  }
+}

+ 12 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java

@@ -23,8 +23,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.PREEMPT_CONTAINER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -53,7 +53,6 @@ import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.TreeSet;
 
-import org.apache.commons.collections.map.HashedMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -63,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -71,13 +71,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -88,7 +89,6 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
-import org.mortbay.log.Log;
 
 public class TestProportionalCapacityPreemptionPolicy {
 
@@ -105,7 +105,7 @@ public class TestProportionalCapacityPreemptionPolicy {
   RMContext rmContext = null;
   RMNodeLabelsManager lm = null;
   CapacitySchedulerConfiguration schedConf = null;
-  EventHandler<ContainerPreemptEvent> mDisp = null;
+  EventHandler<SchedulerEvent> mDisp = null;
   ResourceCalculator rc = new DefaultResourceCalculator();
   Resource clusterResources = null;
   final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
@@ -165,6 +165,9 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(mCS.getRMContext()).thenReturn(rmContext);
     when(rmContext.getNodeLabelManager()).thenReturn(lm);
     mDisp = mock(EventHandler.class);
+    Dispatcher disp = mock(Dispatcher.class);
+    when(rmContext.getDispatcher()).thenReturn(disp);
+    when(disp.getEventHandler()).thenReturn(mDisp);
     rand = new Random();
     long seed = rand.nextLong();
     System.out.println(name.getMethodName() + " SEED: " + seed);
@@ -911,12 +914,12 @@ public class TestProportionalCapacityPreemptionPolicy {
   static class IsPreemptionRequestFor
       extends ArgumentMatcher<ContainerPreemptEvent> {
     private final ApplicationAttemptId appAttId;
-    private final ContainerPreemptEventType type;
+    private final SchedulerEventType type;
     IsPreemptionRequestFor(ApplicationAttemptId appAttId) {
       this(appAttId, PREEMPT_CONTAINER);
     }
     IsPreemptionRequestFor(ApplicationAttemptId appAttId,
-        ContainerPreemptEventType type) {
+        SchedulerEventType type) {
       this.appAttId = appAttId;
       this.type = type;
     }
@@ -933,7 +936,7 @@ public class TestProportionalCapacityPreemptionPolicy {
 
   ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
     ProportionalCapacityPreemptionPolicy policy =
-      new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock);
+      new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
     ParentQueue mRoot = buildMockRootQueue(rand, qData);
     when(mCS.getRootQueue()).thenReturn(mRoot);