|
@@ -1098,31 +1098,45 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
try {
|
|
|
// 1. Create the job object
|
|
|
JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
|
|
|
- String logFileName;
|
|
|
- Path jobHistoryFilePath;
|
|
|
|
|
|
- // 2. Get the log file and the file path
|
|
|
- logFileName =
|
|
|
+ // 2. Check if the user has appropriate access
|
|
|
+ // Get the user group info for the job's owner
|
|
|
+ UserGroupInformation ugi =
|
|
|
+ UserGroupInformation.readFrom(job.getJobConf());
|
|
|
+ LOG.info("Submitting job " + id + " on behalf of user "
|
|
|
+ + ugi.getUserName() + " in groups : "
|
|
|
+ + StringUtils.arrayToString(ugi.getGroupNames()));
|
|
|
+
|
|
|
+ // check the access
|
|
|
+ try {
|
|
|
+ checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB, ugi);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.warn("Access denied for user " + ugi.getUserName()
|
|
|
+ + " in groups : ["
|
|
|
+ + StringUtils.arrayToString(ugi.getGroupNames()) + "]");
|
|
|
+ throw t;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 3. Get the log file and the file path
|
|
|
+ String logFileName =
|
|
|
JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
|
|
|
- jobHistoryFilePath =
|
|
|
+ Path jobHistoryFilePath =
|
|
|
JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
|
|
|
|
|
|
- // 3. Recover the history file. This involved
|
|
|
+ // 4. Recover the history file. This involved
|
|
|
// - deleting file.recover if file exists
|
|
|
// - renaming file.recover to file if file doesnt exist
|
|
|
// This makes sure that the (master) file exists
|
|
|
JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(),
|
|
|
jobHistoryFilePath);
|
|
|
|
|
|
- // 4. Cache the history file name as it costs one dfs access
|
|
|
+ // 5. Cache the history file name as it costs one dfs access
|
|
|
jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
|
|
|
|
|
|
- // 5. Sumbit the job to the jobtracker
|
|
|
+ // 6. Sumbit the job to the jobtracker
|
|
|
addJob(id, job);
|
|
|
} catch (Throwable t) {
|
|
|
- LOG.warn("Failed to recover job " + id + " history details."
|
|
|
- + " Ignoring.", t);
|
|
|
- // TODO : remove job details from the system directory
|
|
|
+ LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);
|
|
|
idIter.remove();
|
|
|
continue;
|
|
|
}
|
|
@@ -1157,8 +1171,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(),
|
|
|
listener, fs);
|
|
|
} catch (Throwable t) {
|
|
|
- LOG.info("JobTracker failed to recover job " + pJob.getJobID()
|
|
|
- + " from history. Ignoring it.", t);
|
|
|
+ LOG.info("Error reading history file of job " + pJob.getJobID()
|
|
|
+ + ". Ignoring the error and continuing.", t);
|
|
|
}
|
|
|
|
|
|
// 3. Close the listener
|
|
@@ -1179,7 +1193,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
} catch (Throwable t) {
|
|
|
LOG.warn("Failed to delete log file (" + logFileName + ") for job "
|
|
|
- + id + ". Ignoring it.", t);
|
|
|
+ + id + ". Continuing.", t);
|
|
|
}
|
|
|
|
|
|
if (pJob.isComplete()) {
|
|
@@ -2763,7 +2777,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
JobInProgress job = new JobInProgress(jobId, this, this.conf);
|
|
|
|
|
|
// check for access
|
|
|
- checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
|
|
|
+ try {
|
|
|
+ checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.warn("Access denied for user " + job.getJobConf().getUser()
|
|
|
+ + ". Ignoring job " + jobId, ioe);
|
|
|
+ new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
|
|
|
+ throw ioe;
|
|
|
+ }
|
|
|
|
|
|
return addJob(jobId, job);
|
|
|
}
|
|
@@ -2794,15 +2815,18 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
|
|
|
// Check whether the specified operation can be performed
|
|
|
- // related to the job. If ownerAllowed is true, then an owner
|
|
|
- // of the job can perform the operation irrespective of
|
|
|
- // access control.
|
|
|
+ // related to the job.
|
|
|
private void checkAccess(JobInProgress job,
|
|
|
QueueManager.QueueOperation oper)
|
|
|
throws IOException {
|
|
|
// get the user group info
|
|
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
|
|
|
+ checkAccess(job, oper, ugi);
|
|
|
+ }
|
|
|
|
|
|
+ // use the passed ugi for checking the access
|
|
|
+ private void checkAccess(JobInProgress job, QueueManager.QueueOperation oper,
|
|
|
+ UserGroupInformation ugi) throws IOException {
|
|
|
// get the queue
|
|
|
String queue = job.getProfile().getQueueName();
|
|
|
if (!queueManager.hasAccess(queue, job, oper, ugi)) {
|
|
@@ -3148,6 +3172,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
return jobs.get(jobid);
|
|
|
}
|
|
|
|
|
|
+ // Get the job directory in system directory
|
|
|
+ Path getSystemDirectoryForJob(JobID id) {
|
|
|
+ return new Path(getSystemDir(), id.toString());
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Change the run-time priority of the given job.
|
|
|
* @param jobId job id
|