|
@@ -73,6 +73,8 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|
|
|
|
|
private AppContext context;
|
|
|
private ThreadPoolExecutor launcherPool;
|
|
|
+ private static final int INITIAL_POOL_SIZE = 10;
|
|
|
+ private int limitOnPoolSize;
|
|
|
private Thread eventHandlingThread;
|
|
|
private BlockingQueue<ContainerLauncherEvent> eventQueue =
|
|
|
new LinkedBlockingQueue<ContainerLauncherEvent>();
|
|
@@ -96,16 +98,17 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|
|
YarnConfiguration.YARN_SECURITY_INFO,
|
|
|
ContainerManagerSecurityInfo.class, SecurityInfo.class);
|
|
|
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
|
|
|
+ this.limitOnPoolSize = conf.getInt(
|
|
|
+ MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
|
|
|
+ MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
|
|
|
super.init(myLocalConfig);
|
|
|
}
|
|
|
|
|
|
public void start() {
|
|
|
- launcherPool =
|
|
|
- new ThreadPoolExecutor(getConfig().getInt(
|
|
|
- MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT, 10),
|
|
|
- Integer.MAX_VALUE, 1, TimeUnit.HOURS,
|
|
|
- new LinkedBlockingQueue<Runnable>());
|
|
|
- launcherPool.prestartAllCoreThreads(); // Wait for work.
|
|
|
+ // Start with a default core-pool size of 10 and change it dynamically.
|
|
|
+ launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
|
|
|
+ Integer.MAX_VALUE, 1, TimeUnit.HOURS,
|
|
|
+ new LinkedBlockingQueue<Runnable>());
|
|
|
eventHandlingThread = new Thread(new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
@@ -117,6 +120,26 @@ public class ContainerLauncherImpl extends AbstractService implements
|
|
|
LOG.error("Returning, interrupted : " + e);
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
+ int poolSize = launcherPool.getCorePoolSize();
|
|
|
+
|
|
|
+ // See if we need up the pool size only if haven't reached the
|
|
|
+ // maximum limit yet.
|
|
|
+ if (poolSize != limitOnPoolSize) {
|
|
|
+
|
|
|
+ // nodes where containers will run at *this* point of time. This is
|
|
|
+ // *not* the cluster size and doesn't need to be.
|
|
|
+ int numNodes = ugiMap.size();
|
|
|
+ int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
|
|
|
+
|
|
|
+ if (poolSize <= idealPoolSize) {
|
|
|
+ // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
|
|
|
+ // later is just a buffer so we are not always increasing the
|
|
|
+ // pool-size
|
|
|
+ launcherPool.setCorePoolSize(idealPoolSize + INITIAL_POOL_SIZE);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// the events from the queue are handled in parallel
|
|
|
// using a thread pool
|
|
|
launcherPool.execute(new EventProcessor(event));
|