|
@@ -28,8 +28,8 @@ import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
-import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -275,12 +275,19 @@ public class FairScheduler extends TaskScheduler {
|
|
|
|
|
|
private class JobInitializer {
|
|
|
private final int DEFAULT_NUM_THREADS = 1;
|
|
|
- private ExecutorService threadPool;
|
|
|
+ private ThreadPoolExecutor threadPool;
|
|
|
private TaskTrackerManager ttm;
|
|
|
public JobInitializer(Configuration conf, TaskTrackerManager ttm) {
|
|
|
int numThreads = conf.getInt("mapred.jobinit.threads",
|
|
|
DEFAULT_NUM_THREADS);
|
|
|
- threadPool = Executors.newFixedThreadPool(numThreads);
|
|
|
+ threadPool = new ThreadPoolExecutor(numThreads, numThreads, 0L,
|
|
|
+ TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
|
|
|
+ // Pre-starting all threads to ensure the threads are executed as JobTracker
|
|
|
+ // instead of the user submitted the job, otherwise job initialization fails
|
|
|
+ // when security is enabled
|
|
|
+ if (threadPool.prestartAllCoreThreads() != numThreads) {
|
|
|
+ throw new RuntimeException("Failed to pre-start threads in JobInitializer");
|
|
|
+ }
|
|
|
this.ttm = ttm;
|
|
|
}
|
|
|
public void initJob(JobInfo jobInfo, JobInProgress job) {
|