|
@@ -167,8 +167,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// in a second
|
|
|
static final String JT_HEARTBEATS_IN_SECOND = "mapred.heartbeats.in.second";
|
|
|
private int NUM_HEARTBEATS_IN_SECOND;
|
|
|
- private final int DEFAULT_NUM_HEARTBEATS_IN_SECOND = 100;
|
|
|
- private final int MIN_NUM_HEARTBEATS_IN_SECOND = 1;
|
|
|
+ private static final int DEFAULT_NUM_HEARTBEATS_IN_SECOND = 100;
|
|
|
+ private static final int MIN_NUM_HEARTBEATS_IN_SECOND = 1;
|
|
|
|
|
|
// Scaling factor for heartbeats, used for testing only
|
|
|
static final String JT_HEARTBEATS_SCALING_FACTOR =
|
|
@@ -220,6 +220,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* A client tried to submit a job before the Job Tracker was ready.
|
|
|
*/
|
|
|
public static class IllegalStateException extends IOException {
|
|
|
+
|
|
|
+ private static final long serialVersionUID = 1L;
|
|
|
+
|
|
|
public IllegalStateException(String msg) {
|
|
|
super(msg);
|
|
|
}
|
|
@@ -1620,6 +1623,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
fs.rename(tmpRestartFile, restartFile);
|
|
|
}
|
|
|
|
|
|
+ // mapred.JobID::forName returns
|
|
|
+ @SuppressWarnings("unchecked") // mapreduce.JobID
|
|
|
public void recover() {
|
|
|
if (!shouldRecover()) {
|
|
|
// clean up jobs structure
|
|
@@ -1674,8 +1679,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
/* THIS PART OF THE CODE IS USELESS. JOB RECOVERY SHOULD BE
|
|
|
* BACKPORTED (MAPREDUCE-873)
|
|
|
*/
|
|
|
- job = new JobInProgress(JobTracker.this, conf, null,
|
|
|
- restartCount, new Credentials() /*HACK*/);
|
|
|
+ job = new JobInProgress(JobTracker.this, conf,
|
|
|
+ new JobInfo((org.apache.hadoop.mapreduce.JobID) id,
|
|
|
+ new Text(user), new Path(getStagingAreaDirInternal(user))),
|
|
|
+ restartCount, new Credentials() /*HACK*/);
|
|
|
|
|
|
// 2. Check if the user has appropriate access
|
|
|
// Get the user group info for the job's owner
|
|
@@ -3696,16 +3703,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
*/
|
|
|
public String getStagingAreaDir() throws IOException {
|
|
|
try{
|
|
|
- final String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
|
|
+ final String user =
|
|
|
+ UserGroupInformation.getCurrentUser().getShortUserName();
|
|
|
return mrOwner.doAs(new PrivilegedExceptionAction<String>() {
|
|
|
@Override
|
|
|
public String run() throws Exception {
|
|
|
- Path stagingRootDir =
|
|
|
- new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
|
|
|
- "/tmp/hadoop/mapred/staging"));
|
|
|
- FileSystem fs = stagingRootDir.getFileSystem(conf);
|
|
|
- return fs.makeQualified(new Path(stagingRootDir,
|
|
|
- user+"/.staging")).toString();
|
|
|
+ return getStagingAreaDirInternal(user);
|
|
|
}
|
|
|
});
|
|
|
} catch(InterruptedException ie) {
|
|
@@ -3713,6 +3716,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private String getStagingAreaDirInternal(String user) throws IOException {
|
|
|
+ final Path stagingRootDir =
|
|
|
+ new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
|
|
|
+ "/tmp/hadoop/mapred/staging"));
|
|
|
+ final FileSystem fs = stagingRootDir.getFileSystem(conf);
|
|
|
+ return fs.makeQualified(new Path(stagingRootDir,
|
|
|
+ user+"/.staging")).toString();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Adds a job to the jobtracker. Make sure that the checks are inplace before
|
|
|
* adding a job. This is the core job submission logic
|
|
@@ -4140,13 +4152,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
Vector<TaskInProgress> completeTasks = job.reportCleanupTIPs(true);
|
|
|
for (Iterator<TaskInProgress> it = completeTasks.iterator();
|
|
|
it.hasNext();) {
|
|
|
- TaskInProgress tip = (TaskInProgress) it.next();
|
|
|
+ TaskInProgress tip = it.next();
|
|
|
reports.add(tip.generateSingleReport());
|
|
|
}
|
|
|
Vector<TaskInProgress> incompleteTasks = job.reportCleanupTIPs(false);
|
|
|
for (Iterator<TaskInProgress> it = incompleteTasks.iterator();
|
|
|
it.hasNext();) {
|
|
|
- TaskInProgress tip = (TaskInProgress) it.next();
|
|
|
+ TaskInProgress tip = it.next();
|
|
|
reports.add(tip.generateSingleReport());
|
|
|
}
|
|
|
return reports.toArray(new TaskReport[reports.size()]);
|
|
@@ -4169,13 +4181,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
Vector<TaskInProgress> completeTasks = job.reportSetupTIPs(true);
|
|
|
for (Iterator<TaskInProgress> it = completeTasks.iterator();
|
|
|
it.hasNext();) {
|
|
|
- TaskInProgress tip = (TaskInProgress) it.next();
|
|
|
+ TaskInProgress tip = it.next();
|
|
|
reports.add(tip.generateSingleReport());
|
|
|
}
|
|
|
Vector<TaskInProgress> incompleteTasks = job.reportSetupTIPs(false);
|
|
|
for (Iterator<TaskInProgress> it = incompleteTasks.iterator();
|
|
|
it.hasNext();) {
|
|
|
- TaskInProgress tip = (TaskInProgress) it.next();
|
|
|
+ TaskInProgress tip = it.next();
|
|
|
reports.add(tip.generateSingleReport());
|
|
|
}
|
|
|
return reports.toArray(new TaskReport[reports.size()]);
|
|
@@ -4761,7 +4773,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
jobStatusList.add(status);
|
|
|
}
|
|
|
}
|
|
|
- return (JobStatus[]) jobStatusList.toArray(
|
|
|
+ return jobStatusList.toArray(
|
|
|
new JobStatus[jobStatusList.size()]);
|
|
|
}
|
|
|
|