|
@@ -51,14 +51,11 @@ class EagerTaskInitializationListener extends JobInProgressListener {
|
|
|
while (true) {
|
|
|
try {
|
|
|
synchronized (jobInitQueue) {
|
|
|
- while (jobInitQueue.isEmpty() && !exitFlag) {
|
|
|
+ while (jobInitQueue.isEmpty()) {
|
|
|
jobInitQueue.wait();
|
|
|
}
|
|
|
- if (exitFlag) {
|
|
|
- break;
|
|
|
- }
|
|
|
+ job = jobInitQueue.remove(0);
|
|
|
}
|
|
|
- job = jobInitQueue.remove(0);
|
|
|
threadPool.execute(new InitJob(job));
|
|
|
} catch (InterruptedException t) {
|
|
|
LOG.info("JobInitManagerThread interrupted.");
|
|
@@ -97,7 +94,6 @@ class EagerTaskInitializationListener extends JobInProgressListener {
|
|
|
private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
|
|
|
private ExecutorService threadPool;
|
|
|
private int numThreads;
|
|
|
- private boolean exitFlag = false;
|
|
|
|
|
|
public EagerTaskInitializationListener(Configuration conf) {
|
|
|
numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS);
|
|
@@ -113,10 +109,7 @@ class EagerTaskInitializationListener extends JobInProgressListener {
|
|
|
public void terminate() throws IOException {
|
|
|
if (jobInitManagerThread != null && jobInitManagerThread.isAlive()) {
|
|
|
LOG.info("Stopping Job Init Manager thread");
|
|
|
- synchronized (jobInitQueue) {
|
|
|
- exitFlag = true;
|
|
|
- jobInitQueue.notify();
|
|
|
- }
|
|
|
+ jobInitManagerThread.interrupt();
|
|
|
try {
|
|
|
jobInitManagerThread.join();
|
|
|
} catch (InterruptedException ex) {
|