Przeglądaj źródła

YARN-10058. Handle uncaught exception for async-scheduling threads to prevent scheduler hangs (#7129). Contributed by Tao Yang.

Reviewed-by: Syed Shameerur Rahman <rhmanns@amazon.com>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
Tao Yang 4 miesięcy temu
rodzic
commit
aa5fe6f550

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -36,6 +36,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
@@ -3543,7 +3544,10 @@ public class CapacityScheduler extends
 
         this.asyncSchedulerThreads = new ArrayList<>();
         for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
-          asyncSchedulerThreads.add(new AsyncScheduleThread(cs));
+          AsyncScheduleThread ast = new AsyncScheduleThread(cs);
+          ast.setUncaughtExceptionHandler(
+              new RMCriticalThreadUncaughtExceptionHandler(cs.rmContext));
+          asyncSchedulerThreads.add(ast);
         }
         this.resourceCommitterService = new ResourceCommitterService(cs);
       }

+ 52 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -135,6 +136,57 @@ public class TestRMHAForAsyncScheduler extends RMHATestBase {
     rm2.stop();
   }
 
+  @Test(timeout = 30000)
+  public void testAsyncScheduleThreadExit() throws Exception {
+    // start two RMs, and transit rm1 to active, rm2 to standby
+    startRMs();
+    // register NM
+    rm1.registerNode("192.1.1.1:1234", 8192, 8);
+    rm1.drainEvents();
+
+    // make sure async-scheduling thread is correct at beginning
+    checkAsyncSchedulerThreads(Thread.currentThread());
+
+    // test async-scheduling thread exit
+    try{
+      // set resource calculator to be null to simulate
+      // NPE in async-scheduling thread
+      CapacityScheduler cs =
+          (CapacityScheduler) rm1.getRMContext().getScheduler();
+      cs.setResourceCalculator(null);
+
+      // wait for rm1 to be transitioned to standby
+      GenericTestUtils.waitFor(() -> rm1.getRMContext().getHAServiceState()
+          == HAServiceProtocol.HAServiceState.STANDBY, 100, 5000);
+
+      // failover rm2 to rm1
+      HAServiceProtocol.StateChangeRequestInfo requestInfo =
+          new HAServiceProtocol.StateChangeRequestInfo(
+              HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+      rm2.adminService.transitionToStandby(requestInfo);
+      GenericTestUtils.waitFor(() -> {
+        try {
+          // this call may fail when rm1 is still initializing
+          // in StandByTransitionRunnable thread
+          rm1.adminService.transitionToActive(requestInfo);
+          return true;
+        } catch (Exception e) {
+          return false;
+        }
+      }, 100, 3000);
+
+      // wait for rm1 to be transitioned to active again
+      GenericTestUtils.waitFor(() -> rm1.getRMContext().getHAServiceState()
+          == HAServiceProtocol.HAServiceState.ACTIVE, 100, 5000);
+
+      // make sure async-scheduling thread is correct after failover
+      checkAsyncSchedulerThreads(Thread.currentThread());
+    } finally {
+      rm1.stop();
+      rm2.stop();
+    }
+  }
+
   private RMApp submitAppAndCheckLaunched(MockRM rm) throws Exception {
     MockRMAppSubmissionData data =
         MockRMAppSubmissionData.Builder.createWithMemory(200, rm)

+ 33 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.contrib.java.lang.system.internal.NoExitSecurityManager;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -1072,6 +1074,37 @@ public class TestCapacitySchedulerAsyncScheduling {
     rm.stop();
   }
 
+  @Test(timeout = 30000)
+  public void testAsyncScheduleThreadExit() throws Exception {
+    // init RM & NM
+    final MockRM rm = new MockRM(conf);
+    rm.start();
+    rm.registerNode("192.168.0.1:1234", 8 * GB);
+    rm.drainEvents();
+
+    // Set no exit security manager to catch System.exit
+    SecurityManager originalSecurityManager = System.getSecurityManager();
+    NoExitSecurityManager noExitSecurityManager =
+        new NoExitSecurityManager(originalSecurityManager);
+    System.setSecurityManager(noExitSecurityManager);
+
+    // test async-scheduling thread exit
+    try{
+      // set resource calculator to be null to simulate
+      // NPE in async-scheduling thread
+      CapacityScheduler cs =
+          (CapacityScheduler) rm.getRMContext().getScheduler();
+      cs.setResourceCalculator(null);
+
+      // wait for RM to be shutdown until timeout
+      GenericTestUtils.waitFor(noExitSecurityManager::isCheckExitCalled,
+          100, 5000);
+    } finally {
+      System.setSecurityManager(originalSecurityManager);
+      rm.stop();
+    }
+  }
+
   private ResourceCommitRequest createAllocateFromReservedProposal(
       int containerId, Resource allocateResource, FiCaSchedulerApp schedulerApp,
       SchedulerNode allocateNode, SchedulerNode reservedNode,