|
@@ -1840,7 +1840,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
//
|
|
//
|
|
|
|
|
|
// All the known jobs. (jobid->JobInProgress)
|
|
// All the known jobs. (jobid->JobInProgress)
|
|
- Map<JobID, JobInProgress> jobs = new TreeMap<JobID, JobInProgress>();
|
|
|
|
|
|
+ Map<JobID, JobInProgress> jobs =
|
|
|
|
+ Collections.synchronizedMap(new TreeMap<JobID, JobInProgress>());
|
|
|
|
|
|
// (user -> list of JobInProgress)
|
|
// (user -> list of JobInProgress)
|
|
TreeMap<String, ArrayList<JobInProgress>> userToJobsMap =
|
|
TreeMap<String, ArrayList<JobInProgress>> userToJobsMap =
|
|
@@ -3609,15 +3610,20 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
* of the JobTracker. But JobInProgress adds info that's useful for
|
|
* of the JobTracker. But JobInProgress adds info that's useful for
|
|
* the JobTracker alone.
|
|
* the JobTracker alone.
|
|
*/
|
|
*/
|
|
- public synchronized JobStatus submitJob(
|
|
|
|
- JobID jobId, String jobSubmitDir, TokenStorage ts) throws IOException {
|
|
|
|
- if(jobs.containsKey(jobId)) {
|
|
|
|
- //job already running, don't start twice
|
|
|
|
- return jobs.get(jobId).getStatus();
|
|
|
|
|
|
+ public JobStatus submitJob(JobID jobId, String jobSubmitDir, TokenStorage ts)
|
|
|
|
+ throws IOException {
|
|
|
|
+ JobInfo jobInfo = null;
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ if (jobs.containsKey(jobId)) {
|
|
|
|
+ // job already running, don't start twice
|
|
|
|
+ return jobs.get(jobId).getStatus();
|
|
|
|
+ }
|
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
|
+ jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),
|
|
|
|
+ new Path(jobSubmitDir));
|
|
}
|
|
}
|
|
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
|
|
- JobInfo jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),
|
|
|
|
- new Path(jobSubmitDir));
|
|
|
|
|
|
+ // Create the JobInProgress, do not lock the JobTracker since
|
|
|
|
+ // we are about to copy job.xml from HDFS
|
|
JobInProgress job = null;
|
|
JobInProgress job = null;
|
|
tokenStorage = ts;
|
|
tokenStorage = ts;
|
|
try {
|
|
try {
|
|
@@ -3626,43 +3632,45 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
throw new IOException(e);
|
|
throw new IOException(e);
|
|
}
|
|
}
|
|
|
|
|
|
- String queue = job.getProfile().getQueueName();
|
|
|
|
- if(!(queueManager.getQueues().contains(queue))) {
|
|
|
|
- job.fail();
|
|
|
|
- throw new IOException("Queue \"" + queue + "\" does not exist");
|
|
|
|
- }
|
|
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ String queue = job.getProfile().getQueueName();
|
|
|
|
+ if (!(queueManager.getQueues().contains(queue))) {
|
|
|
|
+ job.fail();
|
|
|
|
+ throw new IOException("Queue \"" + queue + "\" does not exist");
|
|
|
|
+ }
|
|
|
|
|
|
- // check for access
|
|
|
|
- try {
|
|
|
|
- checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
|
|
|
|
- } catch (IOException ioe) {
|
|
|
|
- LOG.warn("Access denied for user " + job.getJobConf().getUser()
|
|
|
|
- + ". Ignoring job " + jobId, ioe);
|
|
|
|
- job.fail();
|
|
|
|
- throw ioe;
|
|
|
|
- }
|
|
|
|
|
|
+ // check for access
|
|
|
|
+ try {
|
|
|
|
+ checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ LOG.warn("Access denied for user " + job.getJobConf().getUser()
|
|
|
|
+ + ". Ignoring job " + jobId, ioe);
|
|
|
|
+ job.fail();
|
|
|
|
+ throw ioe;
|
|
|
|
+ }
|
|
|
|
|
|
- // Check the job if it cannot run in the cluster because of invalid memory
|
|
|
|
- // requirements.
|
|
|
|
- try {
|
|
|
|
- checkMemoryRequirements(job);
|
|
|
|
- } catch (IOException ioe) {
|
|
|
|
- throw ioe;
|
|
|
|
- }
|
|
|
|
- boolean recovered = true; //TODO: Once the Job recovery code is there,
|
|
|
|
- //(MAPREDUCE-873) we
|
|
|
|
- //must pass the "recovered" flag accurately.
|
|
|
|
- //This is handled in the trunk/0.22
|
|
|
|
- if (!recovered) {
|
|
|
|
- //Store the information in a file so that the job can be recovered
|
|
|
|
- //later (if at all)
|
|
|
|
- Path jobDir = getSystemDirectoryForJob(jobId);
|
|
|
|
- FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
|
|
|
|
- FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
|
|
|
|
- jobInfo.write(out);
|
|
|
|
- out.close();
|
|
|
|
|
|
+ // Check the job if it cannot run in the cluster because of invalid memory
|
|
|
|
+ // requirements.
|
|
|
|
+ try {
|
|
|
|
+ checkMemoryRequirements(job);
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ throw ioe;
|
|
|
|
+ }
|
|
|
|
+ boolean recovered = true; // TODO: Once the Job recovery code is there,
|
|
|
|
+ // (MAPREDUCE-873) we
|
|
|
|
+ // must pass the "recovered" flag accurately.
|
|
|
|
+ // This is handled in the trunk/0.22
|
|
|
|
+ if (!recovered) {
|
|
|
|
+ // Store the information in a file so that the job can be recovered
|
|
|
|
+ // later (if at all)
|
|
|
|
+ Path jobDir = getSystemDirectoryForJob(jobId);
|
|
|
|
+ FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
|
|
|
|
+ FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
|
|
|
|
+ jobInfo.write(out);
|
|
|
|
+ out.close();
|
|
|
|
+ }
|
|
|
|
+ return addJob(jobId, job);
|
|
}
|
|
}
|
|
- return addJob(jobId, job);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -3941,10 +3949,23 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
completedJobStatusStore.store(job);
|
|
completedJobStatusStore.store(job);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Check if the <code>job</code> has been initialized.
|
|
|
|
+ *
|
|
|
|
+ * @param job {@link JobInProgress} to be checked
|
|
|
|
+ * @return <code>true</code> if the job has been initialized,
|
|
|
|
+ * <code>false</code> otherwise
|
|
|
|
+ */
|
|
|
|
+ private boolean isJobInited(JobInProgress job) {
|
|
|
|
+ return job.inited();
|
|
|
|
+ }
|
|
|
|
+
|
|
public JobProfile getJobProfile(JobID jobid) {
|
|
public JobProfile getJobProfile(JobID jobid) {
|
|
synchronized (this) {
|
|
synchronized (this) {
|
|
JobInProgress job = jobs.get(jobid);
|
|
JobInProgress job = jobs.get(jobid);
|
|
if (job != null) {
|
|
if (job != null) {
|
|
|
|
+ // Safe to call JobInProgress.getProfile while holding the lock
|
|
|
|
+ // on the JobTracker since it isn't a synchronized method
|
|
return job.getProfile();
|
|
return job.getProfile();
|
|
} else {
|
|
} else {
|
|
RetireJobInfo info = retireJobs.get(jobid);
|
|
RetireJobInfo info = retireJobs.get(jobid);
|
|
@@ -3955,6 +3976,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
}
|
|
}
|
|
return completedJobStatusStore.readJobProfile(jobid);
|
|
return completedJobStatusStore.readJobProfile(jobid);
|
|
}
|
|
}
|
|
|
|
+
|
|
public JobStatus getJobStatus(JobID jobid) {
|
|
public JobStatus getJobStatus(JobID jobid) {
|
|
if (null == jobid) {
|
|
if (null == jobid) {
|
|
LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
|
|
LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
|
|
@@ -3963,6 +3985,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
synchronized (this) {
|
|
synchronized (this) {
|
|
JobInProgress job = jobs.get(jobid);
|
|
JobInProgress job = jobs.get(jobid);
|
|
if (job != null) {
|
|
if (job != null) {
|
|
|
|
+ // Safe to call JobInProgress.getStatus while holding the lock
|
|
|
|
+ // on the JobTracker since it isn't a synchronized method
|
|
return job.getStatus();
|
|
return job.getStatus();
|
|
} else {
|
|
} else {
|
|
|
|
|
|
@@ -3974,19 +3998,24 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
}
|
|
}
|
|
return completedJobStatusStore.readJobStatus(jobid);
|
|
return completedJobStatusStore.readJobStatus(jobid);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private static final Counters EMPTY_COUNTERS = new Counters();
|
|
public Counters getJobCounters(JobID jobid) {
|
|
public Counters getJobCounters(JobID jobid) {
|
|
synchronized (this) {
|
|
synchronized (this) {
|
|
JobInProgress job = jobs.get(jobid);
|
|
JobInProgress job = jobs.get(jobid);
|
|
if (job != null) {
|
|
if (job != null) {
|
|
- return job.getCounters();
|
|
|
|
|
|
+ return isJobInited(job) ? job.getCounters() : EMPTY_COUNTERS;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return completedJobStatusStore.readCounters(jobid);
|
|
return completedJobStatusStore.readCounters(jobid);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
|
|
|
|
+
|
|
public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
|
|
public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
|
|
JobInProgress job = jobs.get(jobid);
|
|
JobInProgress job = jobs.get(jobid);
|
|
- if (job == null) {
|
|
|
|
- return new TaskReport[0];
|
|
|
|
|
|
+ if (job == null || !isJobInited(job)) {
|
|
|
|
+ return EMPTY_TASK_REPORTS;
|
|
} else {
|
|
} else {
|
|
Vector<TaskReport> reports = new Vector<TaskReport>();
|
|
Vector<TaskReport> reports = new Vector<TaskReport>();
|
|
Vector<TaskInProgress> completeMapTasks =
|
|
Vector<TaskInProgress> completeMapTasks =
|
|
@@ -4007,8 +4036,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
public synchronized TaskReport[] getReduceTaskReports(JobID jobid) {
|
|
public synchronized TaskReport[] getReduceTaskReports(JobID jobid) {
|
|
JobInProgress job = jobs.get(jobid);
|
|
JobInProgress job = jobs.get(jobid);
|
|
- if (job == null) {
|
|
|
|
- return new TaskReport[0];
|
|
|
|
|
|
+ if (job == null || !isJobInited(job)) {
|
|
|
|
+ return EMPTY_TASK_REPORTS;
|
|
} else {
|
|
} else {
|
|
Vector<TaskReport> reports = new Vector<TaskReport>();
|
|
Vector<TaskReport> reports = new Vector<TaskReport>();
|
|
Vector completeReduceTasks = job.reportTasksInProgress(false, true);
|
|
Vector completeReduceTasks = job.reportTasksInProgress(false, true);
|
|
@@ -4027,8 +4056,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) {
|
|
public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) {
|
|
JobInProgress job = jobs.get(jobid);
|
|
JobInProgress job = jobs.get(jobid);
|
|
- if (job == null) {
|
|
|
|
- return new TaskReport[0];
|
|
|
|
|
|
+ if (job == null || !isJobInited(job)) {
|
|
|
|
+ return EMPTY_TASK_REPORTS;
|
|
} else {
|
|
} else {
|
|
Vector<TaskReport> reports = new Vector<TaskReport>();
|
|
Vector<TaskReport> reports = new Vector<TaskReport>();
|
|
Vector<TaskInProgress> completeTasks = job.reportCleanupTIPs(true);
|
|
Vector<TaskInProgress> completeTasks = job.reportCleanupTIPs(true);
|
|
@@ -4050,8 +4079,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
|
|
public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
|
|
JobInProgress job = jobs.get(jobid);
|
|
JobInProgress job = jobs.get(jobid);
|
|
- if (job == null) {
|
|
|
|
- return new TaskReport[0];
|
|
|
|
|
|
+ if (job == null || !isJobInited(job)) {
|
|
|
|
+ return EMPTY_TASK_REPORTS;
|
|
} else {
|
|
} else {
|
|
Vector<TaskReport> reports = new Vector<TaskReport>();
|
|
Vector<TaskReport> reports = new Vector<TaskReport>();
|
|
Vector<TaskInProgress> completeTasks = job.reportSetupTIPs(true);
|
|
Vector<TaskInProgress> completeTasks = job.reportSetupTIPs(true);
|
|
@@ -4070,8 +4099,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
|
|
|
|
-
|
|
|
|
static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
|
|
static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
|
|
"mapred.cluster.map.memory.mb";
|
|
"mapred.cluster.map.memory.mb";
|
|
static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
|
|
static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
|
|
@@ -4087,21 +4114,22 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
* starting from fromEventId.
|
|
* starting from fromEventId.
|
|
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int)
|
|
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int)
|
|
*/
|
|
*/
|
|
- public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
|
|
|
|
|
|
+ public TaskCompletionEvent[] getTaskCompletionEvents(
|
|
JobID jobid, int fromEventId, int maxEvents) throws IOException{
|
|
JobID jobid, int fromEventId, int maxEvents) throws IOException{
|
|
- synchronized (this) {
|
|
|
|
- JobInProgress job = this.jobs.get(jobid);
|
|
|
|
- if (null != job) {
|
|
|
|
- if (job.inited()) {
|
|
|
|
- return job.getTaskCompletionEvents(fromEventId, maxEvents);
|
|
|
|
- } else {
|
|
|
|
- return EMPTY_EVENTS;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ JobInProgress job = this.jobs.get(jobid);
|
|
|
|
+
|
|
|
|
+ if (null != job) {
|
|
|
|
+ return isJobInited(job) ?
|
|
|
|
+ job.getTaskCompletionEvents(fromEventId, maxEvents) :
|
|
|
|
+ TaskCompletionEvent.EMPTY_ARRAY;
|
|
}
|
|
}
|
|
- return completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
|
|
|
|
|
|
+
|
|
|
|
+ return completedJobStatusStore.readJobTaskCompletionEvents(jobid,
|
|
|
|
+ fromEventId,
|
|
|
|
+ maxEvents);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static final String[] EMPTY_TASK_DIAGNOSTICS = new String[0];
|
|
/**
|
|
/**
|
|
* Get the diagnostics for a given task
|
|
* Get the diagnostics for a given task
|
|
* @param taskId the id of the task
|
|
* @param taskId the id of the task
|
|
@@ -4113,7 +4141,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
JobID jobId = taskId.getJobID();
|
|
JobID jobId = taskId.getJobID();
|
|
TaskID tipId = taskId.getTaskID();
|
|
TaskID tipId = taskId.getTaskID();
|
|
JobInProgress job = jobs.get(jobId);
|
|
JobInProgress job = jobs.get(jobId);
|
|
- if (job != null) {
|
|
|
|
|
|
+ if (job != null && isJobInited(job)) {
|
|
TaskInProgress tip = job.getTaskInProgress(tipId);
|
|
TaskInProgress tip = job.getTaskInProgress(tipId);
|
|
if (tip != null) {
|
|
if (tip != null) {
|
|
taskDiagnosticInfo = tip.getDiagnosticInfo(taskId);
|
|
taskDiagnosticInfo = tip.getDiagnosticInfo(taskId);
|
|
@@ -4121,8 +4149,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- return ((taskDiagnosticInfo == null) ? new String[0]
|
|
|
|
- : taskDiagnosticInfo.toArray(new String[0]));
|
|
|
|
|
|
+ return ((taskDiagnosticInfo == null) ? EMPTY_TASK_DIAGNOSTICS :
|
|
|
|
+ taskDiagnosticInfo.toArray(new String[taskDiagnosticInfo.size()]));
|
|
}
|
|
}
|
|
|
|
|
|
/** Get all the TaskStatuses from the tipid. */
|
|
/** Get all the TaskStatuses from the tipid. */
|
|
@@ -4722,8 +4750,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
boolean invalidJob = false;
|
|
boolean invalidJob = false;
|
|
String msg = "";
|
|
String msg = "";
|
|
- long maxMemForMapTask = job.getJobConf().getMemoryForMapTask();
|
|
|
|
- long maxMemForReduceTask = job.getJobConf().getMemoryForReduceTask();
|
|
|
|
|
|
+ long maxMemForMapTask = job.getMemoryForMapTask();
|
|
|
|
+ long maxMemForReduceTask = job.getMemoryForReduceTask();
|
|
|
|
|
|
if (maxMemForMapTask == JobConf.DISABLED_MEMORY_LIMIT
|
|
if (maxMemForMapTask == JobConf.DISABLED_MEMORY_LIMIT
|
|
|| maxMemForReduceTask == JobConf.DISABLED_MEMORY_LIMIT) {
|
|
|| maxMemForReduceTask == JobConf.DISABLED_MEMORY_LIMIT) {
|