瀏覽代碼

YARN-7388. TestAMRestart should be scheduler agnostic.

(cherry picked from commit a1382a18dff8a70aa25240d6fbba6e22832a7679)
(cherry picked from commit 91a7f8d2463de0a94ded0884143e4661c29868a4)
(cherry picked from commit 3fbe67d3e1a93b998e3d056ebd9008e17a35f8b0)
Haibo Chen 7 年之前
父節點
當前提交
288d9506f0

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -1336,6 +1336,14 @@ public abstract class AbstractYarnScheduler
     return -1;
   }
 
+  /**
+   * Kill a RMContainer. This is meant to be called in tests only to simulate
+   * AM container failures.
+   * @param container the container to kill
+   */
+  @VisibleForTesting
+  public abstract void killContainer(RMContainer container);
+
   /**
    * Update internal state of the scheduler.  This can be useful for scheduler
    * implementations that maintain some state that needs to be periodically

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java

@@ -99,6 +99,19 @@ public class SchedulerUtils {
         ContainerExitStatus.ABORTED, diagnostics);
   }
 
+
+  /**
+   * Utility to create a {@link ContainerStatus} for killed containers.
+   * @param containerId {@link ContainerId} of the killed container.
+   * @param diagnostics diagnostic message
+   * @return <code>ContainerStatus</code> for a killed container
+   */
+  public static ContainerStatus createKilledContainerStatus(
+      ContainerId containerId, String diagnostics) {
+    return createAbnormalContainerStatus(containerId,
+        ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, diagnostics);
+  }
+
   /**
    * Utility to create a {@link ContainerStatus} during exceptional
    * circumstances.

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -1760,6 +1760,12 @@ public class CapacityScheduler extends
     }
   }
 
+  @VisibleForTesting
+  @Override
+  public void killContainer(RMContainer container) {
+    markContainerForKillable(container);
+  }
+
   public void markContainerForKillable(
       RMContainer killableContainer) {
     try {

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -793,6 +793,16 @@ public class FairScheduler extends
         incrAllocation);
   }
 
+  @VisibleForTesting
+  @Override
+  public void killContainer(RMContainer container) {
+    ContainerStatus status = SchedulerUtils.createKilledContainerStatus(
+        container.getContainerId(),
+        "Killed by RM to simulate an AM container failure");
+    LOG.info("Killing container " + container);
+    completedContainer(container, status, RMContainerEventType.KILL);
+  }
+
   @Override
   public Allocation allocate(ApplicationAttemptId appAttemptId,
       List<ResourceRequest> ask, List<ContainerId> release,

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -984,6 +984,16 @@ public class FifoScheduler extends
     updateAvailableResourcesMetrics();
   }
 
+  @VisibleForTesting
+  @Override
+  public void killContainer(RMContainer container) {
+    ContainerStatus status = SchedulerUtils.createKilledContainerStatus(
+        container.getContainerId(),
+        "Killed by RM to simulate an AM container failure");
+    LOG.info("Killing container " + container);
+    completedContainer(container, status, RMContainerEventType.KILL);
+  }
+
   @Override
   public synchronized void recoverContainersOnNode(
       List<NMContainerStatus> containerReports, RMNode nm) {

+ 12 - 24
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java

@@ -51,10 +51,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
@@ -376,8 +374,6 @@ public class TestAMRestart {
   @Test(timeout = 100000)
   public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-      ResourceScheduler.class);
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
     conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
     conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
@@ -389,12 +385,12 @@ public class TestAMRestart {
     RMApp app1 = rm1.submitApp(200);
     RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-    CapacityScheduler scheduler =
-        (CapacityScheduler) rm1.getResourceScheduler();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm1.getResourceScheduler();
     ContainerId amContainer =
         ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
     // Preempt the next attempt;
-    scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
+    scheduler.killContainer(scheduler.getRMContainer(amContainer));
 
     rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
     TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
@@ -414,7 +410,7 @@ public class TestAMRestart {
     // Preempt the second attempt.
     ContainerId amContainer2 =
         ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
-    scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2));
+    scheduler.killContainer(scheduler.getRMContainer(amContainer2));
 
     rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
     TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
@@ -503,8 +499,6 @@ public class TestAMRestart {
   @Test(timeout = 100000)
   public void testMaxAttemptOneMeansOne() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        ResourceScheduler.class);
     conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
     conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
     conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
@@ -516,12 +510,12 @@ public class TestAMRestart {
     RMApp app1 = rm1.submitApp(200);
     RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-    CapacityScheduler scheduler =
-        (CapacityScheduler) rm1.getResourceScheduler();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm1.getResourceScheduler();
     ContainerId amContainer =
         ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
     // Preempt the attempt;
-    scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
+    scheduler.killContainer(scheduler.getRMContainer(amContainer));
 
     rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
     TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
@@ -539,8 +533,6 @@ public class TestAMRestart {
   @Test(timeout = 60000)
   public void testPreemptedAMRestartOnRMRestart() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-      ResourceScheduler.class);
     conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
     conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
 
@@ -556,8 +548,8 @@ public class TestAMRestart {
     RMApp app1 = rm1.submitApp(200);
     RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-    CapacityScheduler scheduler =
-        (CapacityScheduler) rm1.getResourceScheduler();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm1.getResourceScheduler();
     ContainerId amContainer =
         ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
 
@@ -577,7 +569,7 @@ public class TestAMRestart {
 
     // Forcibly preempt the am container;
     amContainer = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
-    scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer));
+    scheduler.killContainer(scheduler.getRMContainer(amContainer));
 
     rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
     Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry());
@@ -619,8 +611,6 @@ public class TestAMRestart {
   public void testRMRestartOrFailoverNotCountedForAMFailures()
       throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-      ResourceScheduler.class);
     conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
     conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
 
@@ -631,8 +621,8 @@ public class TestAMRestart {
     MockRM rm1 = new MockRM(conf);
     MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
     rm1.start();
-    CapacityScheduler scheduler =
-        (CapacityScheduler) rm1.getResourceScheduler();
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm1.getResourceScheduler();
     MockNM nm1 =
         new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
     nm1.registerNode();
@@ -694,8 +684,6 @@ public class TestAMRestart {
   @Test (timeout = 120000)
   public void testRMAppAttemptFailuresValidityInterval() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-      ResourceScheduler.class);
     conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
     conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);