|
@@ -19,12 +19,17 @@
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.amlauncher;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.service.AbstractService;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
@@ -34,7 +39,7 @@ public class ApplicationMasterLauncher extends AbstractService implements
|
|
|
EventHandler<AMLauncherEvent> {
|
|
|
private static final Log LOG = LogFactory.getLog(
|
|
|
ApplicationMasterLauncher.class);
|
|
|
- private final ThreadPoolExecutor launcherPool;
|
|
|
+ private ThreadPoolExecutor launcherPool;
|
|
|
private LauncherThread launcherHandlingThread;
|
|
|
|
|
|
private final BlockingQueue<Runnable> masterEvents
|
|
@@ -45,11 +50,30 @@ public class ApplicationMasterLauncher extends AbstractService implements
|
|
|
public ApplicationMasterLauncher(RMContext context) {
|
|
|
super(ApplicationMasterLauncher.class.getName());
|
|
|
this.context = context;
|
|
|
- this.launcherPool = new ThreadPoolExecutor(10, 10, 1,
|
|
|
- TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
|
|
|
this.launcherHandlingThread = new LauncherThread();
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ protected void serviceInit(Configuration conf) throws Exception {
|
|
|
+ int threadCount = conf.getInt(
|
|
|
+ YarnConfiguration.RM_AMLAUNCHER_THREAD_COUNT,
|
|
|
+ YarnConfiguration.DEFAULT_RM_AMLAUNCHER_THREAD_COUNT);
|
|
|
+ ThreadFactory tf = new ThreadFactoryBuilder()
|
|
|
+ .setNameFormat("ApplicationMasterLauncher #%d")
|
|
|
+ .build();
|
|
|
+ launcherPool = new ThreadPoolExecutor(threadCount, threadCount, 1,
|
|
|
+ TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
|
|
|
+ launcherPool.setThreadFactory(tf);
|
|
|
+
|
|
|
+ Configuration newConf = new YarnConfiguration(conf);
|
|
|
+ newConf.setInt(CommonConfigurationKeysPublic.
|
|
|
+ IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
|
|
|
+ conf.getInt(YarnConfiguration.RM_NODEMANAGER_CONNECT_RETIRES,
|
|
|
+ YarnConfiguration.DEFAULT_RM_NODEMANAGER_CONNECT_RETIRES));
|
|
|
+ setConfig(newConf);
|
|
|
+ super.serviceInit(newConf);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected void serviceStart() throws Exception {
|
|
|
launcherHandlingThread.start();
|