|
@@ -18,7 +18,14 @@
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
|
|
|
+import java.io.BufferedReader;
|
|
|
+import java.io.BufferedWriter;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileInputStream;
|
|
|
+import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.InputStreamReader;
|
|
|
+import java.io.OutputStreamWriter;
|
|
|
import java.net.BindException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.UnknownHostException;
|
|
@@ -50,6 +57,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.LocalDirAllocator;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.http.HttpServer;
|
|
@@ -125,6 +133,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
private final List<JobInProgressListener> jobInProgressListeners =
|
|
|
new CopyOnWriteArrayList<JobInProgressListener>();
|
|
|
|
|
|
+ private static final LocalDirAllocator lDirAlloc =
|
|
|
+ new LocalDirAllocator("mapred.local.dir");
|
|
|
// system directories are world-wide readable and owner readable
|
|
|
final static FsPermission SYSTEM_DIR_PERMISSION =
|
|
|
FsPermission.createImmutable((short) 0733); // rwx-wx-wx
|
|
@@ -1257,13 +1267,39 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// I. Init the jobs and cache the recovered job history filenames
|
|
|
Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
|
|
|
Iterator<JobID> idIter = jobsToRecover.iterator();
|
|
|
+ JobInProgress job = null;
|
|
|
+ File jobIdFile = null;
|
|
|
while (idIter.hasNext()) {
|
|
|
JobID id = idIter.next();
|
|
|
LOG.info("Trying to recover details of job " + id);
|
|
|
try {
|
|
|
- // 1. Create the job object
|
|
|
- JobInProgress job =
|
|
|
- new JobInProgress(id, JobTracker.this, conf, restartCount);
|
|
|
+ // 1. Recover job owner and create JIP
|
|
|
+ jobIdFile =
|
|
|
+ new File(lDirAlloc.getLocalPathToRead(SUBDIR + "/" + id, conf).toString());
|
|
|
+
|
|
|
+ String user = null;
|
|
|
+ if (jobIdFile != null && jobIdFile.exists()) {
|
|
|
+ LOG.info("File " + jobIdFile + " exists for job " + id);
|
|
|
+ FileInputStream in = new FileInputStream(jobIdFile);
|
|
|
+ BufferedReader reader = null;
|
|
|
+ try {
|
|
|
+ reader = new BufferedReader(new InputStreamReader(in));
|
|
|
+ user = reader.readLine();
|
|
|
+ LOG.info("Recovered user " + user + " for job " + id);
|
|
|
+ } finally {
|
|
|
+ if (reader != null) {
|
|
|
+ reader.close();
|
|
|
+ }
|
|
|
+ in.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (user == null) {
|
|
|
+ throw new RuntimeException("Incomplete job " + id);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create the job
|
|
|
+ job = new JobInProgress(id, JobTracker.this, conf, user,
|
|
|
+ restartCount);
|
|
|
|
|
|
// 2. Check if the user has appropriate access
|
|
|
// Get the user group info for the job's owner
|
|
@@ -1309,6 +1345,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
} catch (Throwable t) {
|
|
|
LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);
|
|
|
idIter.remove();
|
|
|
+ if (jobIdFile != null) {
|
|
|
+ jobIdFile.delete();
|
|
|
+ jobIdFile = null;
|
|
|
+ }
|
|
|
+ if (job != null) {
|
|
|
+ job.fail();
|
|
|
+ job = null;
|
|
|
+ }
|
|
|
continue;
|
|
|
}
|
|
|
}
|
|
@@ -1475,6 +1519,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
Map<String, Node> hostnameToNodeMap =
|
|
|
Collections.synchronizedMap(new TreeMap<String, Node>());
|
|
|
|
|
|
+ // job-id->username during staging
|
|
|
+ Map<JobID, String> jobToUserMap =
|
|
|
+ Collections.synchronizedMap(new TreeMap<JobID, String>());
|
|
|
+
|
|
|
// Number of resolved entries
|
|
|
int numResolved;
|
|
|
|
|
@@ -1732,7 +1780,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
|
|
|
// Same with 'localDir' except it's always on the local disk.
|
|
|
- jobConf.deleteLocalFiles(SUBDIR);
|
|
|
+ if (!hasRestarted) {
|
|
|
+ jobConf.deleteLocalFiles(SUBDIR);
|
|
|
+ }
|
|
|
|
|
|
// Initialize history again if it is not initialized
|
|
|
// because history was on dfs and namenode was in safemode.
|
|
@@ -2117,6 +2167,17 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// mark the job for cleanup at all the trackers
|
|
|
addJobForCleanup(id);
|
|
|
|
|
|
+ try {
|
|
|
+ File userFileForJob =
|
|
|
+ new File(lDirAlloc.getLocalPathToRead(SUBDIR + "/" + id,
|
|
|
+ conf).toString());
|
|
|
+ if (userFileForJob != null) {
|
|
|
+ userFileForJob.delete();
|
|
|
+ }
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.info("Failed to delete job id mapping for job " + id, ioe);
|
|
|
+ }
|
|
|
+
|
|
|
// add the blacklisted trackers to potentially faulty list
|
|
|
if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) {
|
|
|
if (job.getNoOfBlackListedTrackers() > 0) {
|
|
@@ -2989,7 +3050,17 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* Allocates a new JobId string.
|
|
|
*/
|
|
|
public synchronized JobID getNewJobId() throws IOException {
|
|
|
- return new JobID(getTrackerIdentifier(), nextJobId++);
|
|
|
+ JobID id = new JobID(getTrackerIdentifier(), nextJobId++);
|
|
|
+
|
|
|
+ // get the user group info
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
|
|
|
+
|
|
|
+ // mark the user for this id
|
|
|
+ jobToUserMap.put(id, ugi.getUserName());
|
|
|
+
|
|
|
+ LOG.info("Job id " + id + " assigned to user " + ugi.getUserName());
|
|
|
+
|
|
|
+ return id;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -3005,12 +3076,59 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
//job already running, don't start twice
|
|
|
return jobs.get(jobId).getStatus();
|
|
|
}
|
|
|
+
|
|
|
+ // check if the owner is uploding the splits or not
|
|
|
+ // get the user group info
|
|
|
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
|
|
|
+
|
|
|
+ // check if the user invoking this api is the owner of this job
|
|
|
+ if (!jobToUserMap.get(jobId).equals(ugi.getUserName())) {
|
|
|
+ throw new IOException("User " + ugi.getUserName()
|
|
|
+ + " is not the owner of the job " + jobId);
|
|
|
+ }
|
|
|
|
|
|
- JobInProgress job = new JobInProgress(jobId, this, this.conf);
|
|
|
+ jobToUserMap.remove(jobId);
|
|
|
+
|
|
|
+ // persist
|
|
|
+ File userFileForJob =
|
|
|
+ new File(lDirAlloc.getLocalPathForWrite(SUBDIR + "/" + jobId,
|
|
|
+ conf).toString());
|
|
|
+ if (userFileForJob == null) {
|
|
|
+ LOG.info("Failed to create job-id file for job " + jobId + " at " + userFileForJob);
|
|
|
+ } else {
|
|
|
+ FileOutputStream fout = new FileOutputStream(userFileForJob);
|
|
|
+ BufferedWriter writer = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ writer = new BufferedWriter(new OutputStreamWriter(fout));
|
|
|
+ writer.write(ugi.getUserName() + "\n");
|
|
|
+ } finally {
|
|
|
+ if (writer != null) {
|
|
|
+ writer.close();
|
|
|
+ }
|
|
|
+ fout.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("Job " + jobId + " user info persisted to file : " + userFileForJob);
|
|
|
+ }
|
|
|
+
|
|
|
+ JobInProgress job = null;
|
|
|
+ try {
|
|
|
+ job = new JobInProgress(jobId, this, this.conf, ugi.getUserName(), 0);
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (userFileForJob != null) {
|
|
|
+ userFileForJob.delete();
|
|
|
+ }
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
|
|
|
String queue = job.getProfile().getQueueName();
|
|
|
if(!(queueManager.getQueues().contains(queue))) {
|
|
|
new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
|
|
|
+ job.fail();
|
|
|
+ if (userFileForJob != null) {
|
|
|
+ userFileForJob.delete();
|
|
|
+ }
|
|
|
throw new IOException("Queue \"" + queue + "\" does not exist");
|
|
|
}
|
|
|
|
|
@@ -3020,6 +3138,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
} catch (IOException ioe) {
|
|
|
LOG.warn("Access denied for user " + job.getJobConf().getUser()
|
|
|
+ ". Ignoring job " + jobId, ioe);
|
|
|
+ job.fail();
|
|
|
+ if (userFileForJob != null) {
|
|
|
+ userFileForJob.delete();
|
|
|
+ }
|
|
|
new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
|
|
|
throw ioe;
|
|
|
}
|