|
@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
|
|
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
|
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
|
@@ -1266,12 +1267,16 @@ public class FairScheduler extends
|
|
|
|
|
|
updateThread = new UpdateThread();
|
|
updateThread = new UpdateThread();
|
|
updateThread.setName("FairSchedulerUpdateThread");
|
|
updateThread.setName("FairSchedulerUpdateThread");
|
|
|
|
+ updateThread.setUncaughtExceptionHandler(
|
|
|
|
+ new RMCriticalThreadUncaughtExceptionHandler(rmContext));
|
|
updateThread.setDaemon(true);
|
|
updateThread.setDaemon(true);
|
|
|
|
|
|
if (continuousSchedulingEnabled) {
|
|
if (continuousSchedulingEnabled) {
|
|
// start continuous scheduling thread
|
|
// start continuous scheduling thread
|
|
schedulingThread = new ContinuousSchedulingThread();
|
|
schedulingThread = new ContinuousSchedulingThread();
|
|
schedulingThread.setName("FairSchedulerContinuousScheduling");
|
|
schedulingThread.setName("FairSchedulerContinuousScheduling");
|
|
|
|
+ schedulingThread.setUncaughtExceptionHandler(
|
|
|
|
+ new RMCriticalThreadUncaughtExceptionHandler(rmContext));
|
|
schedulingThread.setDaemon(true);
|
|
schedulingThread.setDaemon(true);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1297,6 +1302,8 @@ public class FairScheduler extends
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
protected void createPreemptionThread() {
|
|
protected void createPreemptionThread() {
|
|
preemptionThread = new FSPreemptionThread(this);
|
|
preemptionThread = new FSPreemptionThread(this);
|
|
|
|
+ preemptionThread.setUncaughtExceptionHandler(
|
|
|
|
+ new RMCriticalThreadUncaughtExceptionHandler(rmContext));
|
|
}
|
|
}
|
|
|
|
|
|
private void updateReservationThreshold() {
|
|
private void updateReservationThreshold() {
|