|
@@ -32,6 +32,7 @@ import java.util.Map.Entry;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.concurrent.ScheduledFuture;
|
|
import java.util.concurrent.ScheduledFuture;
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.locks.Lock;
|
|
import java.util.concurrent.locks.Lock;
|
|
import java.util.concurrent.locks.ReadWriteLock;
|
|
import java.util.concurrent.locks.ReadWriteLock;
|
|
@@ -129,6 +130,8 @@ import org.apache.hadoop.yarn.state.StateMachine;
|
|
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
|
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
|
|
|
|
|
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
|
+
|
|
/** Implementation of Job interface. Maintains the state machines of Job.
|
|
/** Implementation of Job interface. Maintains the state machines of Job.
|
|
* The read and write calls use ReadWriteLock for concurrency.
|
|
* The read and write calls use ReadWriteLock for concurrency.
|
|
*/
|
|
*/
|
|
@@ -644,8 +647,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|
|
|
|
|
private JobStateInternal forcedState = null;
|
|
private JobStateInternal forcedState = null;
|
|
|
|
|
|
- //Executor used for running future tasks. Setting thread pool size to 1
|
|
|
|
- private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
|
|
|
|
|
|
+ //Executor used for running future tasks.
|
|
|
|
+ private ScheduledThreadPoolExecutor executor;
|
|
private ScheduledFuture failWaitTriggerScheduledFuture;
|
|
private ScheduledFuture failWaitTriggerScheduledFuture;
|
|
|
|
|
|
private JobState lastNonFinalState = JobState.NEW;
|
|
private JobState lastNonFinalState = JobState.NEW;
|
|
@@ -687,6 +690,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
|
this.aclsManager = new JobACLsManager(conf);
|
|
this.aclsManager = new JobACLsManager(conf);
|
|
this.username = System.getProperty("user.name");
|
|
this.username = System.getProperty("user.name");
|
|
this.jobACLs = aclsManager.constructJobACLs(conf);
|
|
this.jobACLs = aclsManager.constructJobACLs(conf);
|
|
|
|
+
|
|
|
|
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
|
|
|
+ .setNameFormat("Job Fail Wait Timeout Monitor #%d")
|
|
|
|
+ .setDaemon(true)
|
|
|
|
+ .build();
|
|
|
|
+ this.executor = new ScheduledThreadPoolExecutor(1, threadFactory);
|
|
|
|
+
|
|
// This "this leak" is okay because the retained pointer is in an
|
|
// This "this leak" is okay because the retained pointer is in an
|
|
// instance variable.
|
|
// instance variable.
|
|
stateMachine = stateMachineFactory.make(this);
|
|
stateMachine = stateMachineFactory.make(this);
|