瀏覽代碼

YARN-3508. Prevent processing preemption events on the main RM dispatcher. (Varun Saxena via wangda)

(cherry picked from commit 0e4b06690ff51fbde3ab26f68fde8aeb32af69af)
Wangda Tan 10 年之前
父節點
當前提交
d61dd10b50
共有 12 個文件被更改,包括 161 次插入102 次删除
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 0 34
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  3. 2 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
  4. 1 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
  5. 22 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
  6. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
  7. 0 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java
  8. 24 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  9. 6 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
  10. 79 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
  11. 12 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
  12. 8 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -553,6 +553,9 @@ Release 2.7.2 - UNRELEASED
     YARN-3793. Several NPEs when deleting local files on NM recovery (Varun
     Saxena via jlowe)
 
+    YARN-3508. Prevent processing preemption events on the main RM dispatcher. 
+    (Varun Saxena via wangda)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 0 - 34
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
@@ -614,9 +613,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
             YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
             SchedulingEditPolicy.class);
         if (policies.size() > 0) {
-          rmDispatcher.register(ContainerPreemptEventType.class,
-              new RMContainerPreemptEventDispatcher(
-                  (PreemptableResourceScheduler) scheduler));
           for (SchedulingEditPolicy policy : policies) {
             LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
             // periodically check whether we need to take action to guarantee
@@ -786,36 +782,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
     }
   }
 
-  @Private
-  public static final class
-    RMContainerPreemptEventDispatcher
-      implements EventHandler<ContainerPreemptEvent> {
-
-    private final PreemptableResourceScheduler scheduler;
-
-    public RMContainerPreemptEventDispatcher(
-        PreemptableResourceScheduler scheduler) {
-      this.scheduler = scheduler;
-    }
-
-    @Override
-    public void handle(ContainerPreemptEvent event) {
-      ApplicationAttemptId aid = event.getAppId();
-      RMContainer container = event.getContainer();
-      switch (event.getType()) {
-      case DROP_RESERVATION:
-        scheduler.dropContainerReservation(container);
-        break;
-      case PREEMPT_CONTAINER:
-        scheduler.preemptContainer(aid, container);
-        break;
-      case KILL_CONTAINER:
-        scheduler.killContainer(container);
-        break;
-      }
-    }
-  }
-
   @Private
   public static final class ApplicationAttemptEventDispatcher implements
       EventHandler<RMAppAttemptEvent> {

+ 2 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java

@@ -18,14 +18,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.monitor;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
 
 public interface SchedulingEditPolicy {
 
-  public void init(Configuration config,
-      EventHandler<ContainerPreemptEvent> dispatcher,
+  public void init(Configuration config, RMContext context,
       PreemptableResourceScheduler scheduler);
 
   /**

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java

@@ -54,9 +54,8 @@ public class SchedulingMonitor extends AbstractService {
     return scheduleEditPolicy;
   }
 
-  @SuppressWarnings("unchecked")
   public void serviceInit(Configuration conf) throws Exception {
-    scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(),
+    scheduleEditPolicy.init(conf, rmContext,
         (PreemptableResourceScheduler) rmContext.getScheduler());
     this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
     super.serviceInit(conf);

+ 22 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java

@@ -38,13 +38,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -52,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -118,8 +118,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   public static final String NATURAL_TERMINATION_FACTOR =
       "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
 
-  // the dispatcher to send preempt and kill events
-  public EventHandler<ContainerPreemptEvent> dispatcher;
+  private RMContext rmContext;
 
   private final Clock clock;
   private double maxIgnoredOverCapacity;
@@ -141,20 +140,17 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   }
 
   public ProportionalCapacityPreemptionPolicy(Configuration config,
-      EventHandler<ContainerPreemptEvent> dispatcher,
-      CapacityScheduler scheduler) {
-    this(config, dispatcher, scheduler, new SystemClock());
+      RMContext context, CapacityScheduler scheduler) {
+    this(config, context, scheduler, new SystemClock());
   }
 
   public ProportionalCapacityPreemptionPolicy(Configuration config,
-      EventHandler<ContainerPreemptEvent> dispatcher,
-      CapacityScheduler scheduler, Clock clock) {
-    init(config, dispatcher, scheduler);
+      RMContext context, CapacityScheduler scheduler, Clock clock) {
+    init(config, context, scheduler);
     this.clock = clock;
   }
 
-  public void init(Configuration config,
-      EventHandler<ContainerPreemptEvent> disp,
+  public void init(Configuration config, RMContext context,
       PreemptableResourceScheduler sched) {
     LOG.info("Preemption monitor:" + this.getClass().getCanonicalName());
     assert null == scheduler : "Unexpected duplicate call to init";
@@ -163,7 +159,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
           sched.getClass().getCanonicalName() + " not instance of " +
           CapacityScheduler.class.getCanonicalName());
     }
-    dispatcher = disp;
+    rmContext = context;
     scheduler = (CapacityScheduler) sched;
     maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
     naturalTerminationFactor =
@@ -196,6 +192,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @param root the root of the CapacityScheduler queue hierarchy
    * @param clusterResources the total amount of resources in the cluster
    */
+  @SuppressWarnings("unchecked")
   private void containerBasedPreemptOrKill(CSQueue root,
       Resource clusterResources) {
     // All partitions to look at
@@ -248,8 +245,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
     // preempt (or kill) the selected containers
     for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
          : toPreempt.entrySet()) {
+      ApplicationAttemptId appAttemptId = e.getKey();
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Send to scheduler: in app=" + e.getKey()
+        LOG.debug("Send to scheduler: in app=" + appAttemptId
             + " #containers-to-be-preempted=" + e.getValue().size());
       }
       for (RMContainer container : e.getValue()) {
@@ -257,13 +255,15 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
         if (preempted.get(container) != null &&
             preempted.get(container) + maxWaitTime < clock.getTime()) {
           // kill it
-          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
-                ContainerPreemptEventType.KILL_CONTAINER));
+          rmContext.getDispatcher().getEventHandler().handle(
+              new ContainerPreemptEvent(appAttemptId, container,
+                  SchedulerEventType.KILL_CONTAINER));
           preempted.remove(container);
         } else {
           //otherwise just send preemption events
-          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
-                ContainerPreemptEventType.PREEMPT_CONTAINER));
+          rmContext.getDispatcher().getEventHandler().handle(
+              new ContainerPreemptEvent(appAttemptId, container,
+                  SchedulerEventType.PREEMPT_CONTAINER));
           if (preempted.get(container) == null) {
             preempted.put(container, clock.getTime());
           }
@@ -735,6 +735,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * Given a target preemption for a specific application, select containers
    * to preempt (after unreserving all reservation for that app).
    */
+  @SuppressWarnings("unchecked")
   private void preemptFrom(FiCaSchedulerApp app,
       Resource clusterResource, Map<String, Resource> resToObtainByPartition,
       List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
@@ -758,8 +759,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
           clusterResource, preemptMap);
 
       if (!observeOnly) {
-        dispatcher.handle(new ContainerPreemptEvent(appId, c,
-            ContainerPreemptEventType.DROP_RESERVATION));
+        rmContext.getDispatcher().getEventHandler().handle(
+            new ContainerPreemptEvent(
+                appId, c, SchedulerEventType.DROP_RESERVATION));
       }
     }
 

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java

@@ -19,20 +19,20 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 
 /**
  * Simple event class used to communicate containers unreservations, preemption, killing
  */
-public class ContainerPreemptEvent
-    extends AbstractEvent<ContainerPreemptEventType> {
+public class ContainerPreemptEvent extends SchedulerEvent {
 
   private final ApplicationAttemptId aid;
   private final RMContainer container;
 
   public ContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container,
-      ContainerPreemptEventType type) {
+      SchedulerEventType type) {
     super(type);
     this.aid = aid;
     this.container = container;

+ 0 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java

@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
-
-public enum ContainerPreemptEventType {
-
-  DROP_RESERVATION,
-  PREEMPT_CONTAINER,
-  KILL_CONTAINER
-
-}

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -1346,6 +1347,29 @@ public class CapacityScheduler extends
           RMContainerEventType.EXPIRE);
     }
     break;
+    case DROP_RESERVATION:
+    {
+      ContainerPreemptEvent dropReservationEvent = (ContainerPreemptEvent)event;
+      RMContainer container = dropReservationEvent.getContainer();
+      dropContainerReservation(container);
+    }
+    break;
+    case PREEMPT_CONTAINER:
+    {
+      ContainerPreemptEvent preemptContainerEvent =
+          (ContainerPreemptEvent)event;
+      ApplicationAttemptId aid = preemptContainerEvent.getAppId();
+      RMContainer containerToBePreempted = preemptContainerEvent.getContainer();
+      preemptContainer(aid, containerToBePreempted);
+    }
+    break;
+    case KILL_CONTAINER:
+    {
+      ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event;
+      RMContainer containerToBeKilled = killContainerEvent.getContainer();
+      killContainer(containerToBeKilled);
+    }
+    break;
     default:
       LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
     }

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java

@@ -36,5 +36,10 @@ public enum SchedulerEventType {
   APP_ATTEMPT_REMOVED,
 
   // Source: ContainerAllocationExpirer
-  CONTAINER_EXPIRED
+  CONTAINER_EXPIRED,
+
+  // Source: SchedulingEditPolicy
+  DROP_RESERVATION,
+  PREEMPT_CONTAINER,
+  KILL_CONTAINER
 }

+ 79 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java

@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRMDispatcher {
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout=10000)
+  public void testSchedulerEventDispatcherForPreemptionEvents() {
+    AsyncDispatcher rmDispatcher = new AsyncDispatcher();
+    CapacityScheduler sched = spy(new CapacityScheduler());
+    YarnConfiguration conf = new YarnConfiguration();
+    SchedulerEventDispatcher schedulerDispatcher =
+        new SchedulerEventDispatcher(sched);
+    rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
+    rmDispatcher.init(conf);
+    rmDispatcher.start();
+    schedulerDispatcher.init(conf);
+    schedulerDispatcher.start();
+    try {
+      ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class);
+      RMContainer container = mock(RMContainer.class);
+      ContainerPreemptEvent event1 = new ContainerPreemptEvent(
+          appAttemptId, container, SchedulerEventType.DROP_RESERVATION);
+      rmDispatcher.getEventHandler().handle(event1);
+      ContainerPreemptEvent event2 = new ContainerPreemptEvent(
+           appAttemptId, container, SchedulerEventType.KILL_CONTAINER);
+      rmDispatcher.getEventHandler().handle(event2);
+      ContainerPreemptEvent event3 = new ContainerPreemptEvent(
+          appAttemptId, container, SchedulerEventType.PREEMPT_CONTAINER);
+      rmDispatcher.getEventHandler().handle(event3);
+      // Wait for events to be processed by scheduler dispatcher.
+      Thread.sleep(1000);
+      verify(sched, times(3)).handle(any(SchedulerEvent.class));
+      verify(sched).dropContainerReservation(container);
+      verify(sched).preemptContainer(appAttemptId, container);
+      verify(sched).killContainer(container);
+    } catch (InterruptedException e) {
+      Assert.fail();
+    } finally {
+      schedulerDispatcher.stop();
+      rmDispatcher.stop();
+    }
+  }
+}

+ 12 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java

@@ -23,8 +23,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.PREEMPT_CONTAINER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -66,7 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@@ -76,6 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -104,7 +106,7 @@ public class TestProportionalCapacityPreemptionPolicy {
   RMContext rmContext = null;
   RMNodeLabelsManager lm = null;
   CapacitySchedulerConfiguration schedConf = null;
-  EventHandler<ContainerPreemptEvent> mDisp = null;
+  EventHandler<SchedulerEvent> mDisp = null;
   ResourceCalculator rc = new DefaultResourceCalculator();
   Resource clusterResources = null;
   final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
@@ -164,6 +166,9 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(mCS.getRMContext()).thenReturn(rmContext);
     when(rmContext.getNodeLabelManager()).thenReturn(lm);
     mDisp = mock(EventHandler.class);
+    Dispatcher disp = mock(Dispatcher.class);
+    when(rmContext.getDispatcher()).thenReturn(disp);
+    when(disp.getEventHandler()).thenReturn(mDisp);
     rand = new Random();
     long seed = rand.nextLong();
     System.out.println(name.getMethodName() + " SEED: " + seed);
@@ -866,12 +871,12 @@ public class TestProportionalCapacityPreemptionPolicy {
   static class IsPreemptionRequestFor
       extends ArgumentMatcher<ContainerPreemptEvent> {
     private final ApplicationAttemptId appAttId;
-    private final ContainerPreemptEventType type;
+    private final SchedulerEventType type;
     IsPreemptionRequestFor(ApplicationAttemptId appAttId) {
       this(appAttId, PREEMPT_CONTAINER);
     }
     IsPreemptionRequestFor(ApplicationAttemptId appAttId,
-        ContainerPreemptEventType type) {
+        SchedulerEventType type) {
       this.appAttId = appAttId;
       this.type = type;
     }
@@ -888,7 +893,7 @@ public class TestProportionalCapacityPreemptionPolicy {
 
   ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
     ProportionalCapacityPreemptionPolicy policy =
-      new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock);
+      new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
     ParentQueue mRoot = buildMockRootQueue(rand, qData);
     when(mCS.getRootQueue()).thenReturn(mRoot);
 

+ 8 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java

@@ -50,13 +50,13 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -92,7 +93,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
   private Configuration conf = null;
   private CapacitySchedulerConfiguration csConf = null;
   private CapacityScheduler cs = null;
-  private EventHandler<ContainerPreemptEvent> mDisp = null;
+  private EventHandler<SchedulerEvent> mDisp = null;
   private ProportionalCapacityPreemptionPolicy policy = null;
   private Resource clusterResource = null;
 
@@ -125,11 +126,14 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
 
     rmContext = mock(RMContext.class);
     when(rmContext.getNodeLabelManager()).thenReturn(nlm);
+    Dispatcher disp = mock(Dispatcher.class);
+    when(rmContext.getDispatcher()).thenReturn(disp);
+    when(disp.getEventHandler()).thenReturn(mDisp);
     csConf = new CapacitySchedulerConfiguration();
     when(cs.getConfiguration()).thenReturn(csConf);
     when(cs.getRMContext()).thenReturn(rmContext);
 
-    policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock);
+    policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock);
     partitionToResource = new HashMap<>();
     nodeIdToSchedulerNodes = new HashMap<>();
     nameToCSQueues = new HashMap<>();
@@ -828,7 +832,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
     when(cs.getClusterResource()).thenReturn(clusterResource);
     mockApplications(appsConfig);
 
-    policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock);
+    policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock);
   }
 
   private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,