|
@@ -46,6 +46,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
+import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -123,6 +125,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
final static FsPermission SYSTEM_DIR_PERMISSION =
|
|
|
FsPermission.createImmutable((short) 0733); // rwx-wx-wx
|
|
|
|
|
|
+ // system files should have 700 permission
|
|
|
+ final static FsPermission SYSTEM_FILE_PERMISSION =
|
|
|
+ FsPermission.createImmutable((short) 0700); // rwx------
|
|
|
+
|
|
|
/**
|
|
|
* A client tried to submit a job before the Job Tracker was ready.
|
|
|
*/
|
|
@@ -672,6 +678,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
Set<JobID> jobsToRecover; // set of jobs to be recovered
|
|
|
|
|
|
private int totalEventsRecovered = 0;
|
|
|
+ private int restartCount = 0;
|
|
|
+ private boolean shouldRecover = false;
|
|
|
|
|
|
Set<String> recoveredTrackers =
|
|
|
Collections.synchronizedSet(new HashSet<String>());
|
|
@@ -850,7 +858,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
|
|
|
public boolean shouldRecover() {
|
|
|
- return jobsToRecover.size() != 0;
|
|
|
+ return shouldRecover;
|
|
|
}
|
|
|
|
|
|
public boolean shouldSchedule() {
|
|
@@ -888,18 +896,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
// checks if the job dir has the required files
|
|
|
public void checkAndAddJob(FileStatus status) throws IOException {
|
|
|
- String jobName = status.getPath().getName();
|
|
|
- if (isJobNameValid(jobName)) {
|
|
|
+ String fileName = status.getPath().getName();
|
|
|
+ if (isJobNameValid(fileName)) {
|
|
|
if (JobClient.isJobDirValid(status.getPath(), fs)) {
|
|
|
- recoveryManager.addJobForRecovery(JobID.forName(jobName));
|
|
|
+ recoveryManager.addJobForRecovery(JobID.forName(fileName));
|
|
|
+ shouldRecover = true; // enable actual recovery if num-files > 1
|
|
|
} else {
|
|
|
- LOG.info("Found an incomplete job directory " + jobName + "."
|
|
|
+ LOG.info("Found an incomplete job directory " + fileName + "."
|
|
|
+ " Deleting it!!");
|
|
|
fs.delete(status.getPath(), true);
|
|
|
}
|
|
|
- } else {
|
|
|
- LOG.info("Deleting " + status.getPath());
|
|
|
- fs.delete(status.getPath(), true);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -918,8 +924,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// Set the start/launch time only if there are recovered tasks
|
|
|
// Increment the job's restart count
|
|
|
jip.updateJobInfo(job.getLong(JobHistory.Keys.SUBMIT_TIME),
|
|
|
- job.getLong(JobHistory.Keys.LAUNCH_TIME),
|
|
|
- job.getInt(Keys.RESTART_COUNT) + 1);
|
|
|
+ job.getLong(JobHistory.Keys.LAUNCH_TIME));
|
|
|
|
|
|
// Save the new job status
|
|
|
JobStatus newStatus = (JobStatus)jip.getStatus().clone();
|
|
@@ -1119,7 +1124,84 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
expireLaunchingTasks.removeTask(attemptId);
|
|
|
}
|
|
|
|
|
|
+ Path getRestartCountFile() {
|
|
|
+ return new Path(getSystemDir(), "jobtracker.info");
|
|
|
+ }
|
|
|
+
|
|
|
+ Path getTempRestartCountFile() {
|
|
|
+ return new Path(getSystemDir(), "jobtracker.info.recover");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Initialize the recovery process. It simply creates a jobtracker.info file
|
|
|
+ * in the jobtracker's system directory and writes its restart count in it.
|
|
|
+ * For the first start, the jobtracker writes '0' in it. Upon subsequent
|
|
|
+ * restarts the jobtracker replaces the count with its current count which
|
|
|
+ * is (old count + 1). The whole purpose of this api is to obtain restart
|
|
|
+ * counts across restarts to avoid attempt-id clashes.
|
|
|
+ *
|
|
|
+ * Note that in between if the jobtracker.info files goes missing then the
|
|
|
+ * jobtracker will disable recovery and continue.
|
|
|
+ *
|
|
|
+ */
|
|
|
+ void updateRestartCount() throws IOException {
|
|
|
+ Path restartFile = getRestartCountFile();
|
|
|
+ Path tmpRestartFile = getTempRestartCountFile();
|
|
|
+ FileSystem fs = restartFile.getFileSystem(conf);
|
|
|
+ FsPermission filePerm = new FsPermission(SYSTEM_FILE_PERMISSION);
|
|
|
+
|
|
|
+ // read the count from the jobtracker info file
|
|
|
+ if (fs.exists(restartFile)) {
|
|
|
+ fs.delete(tmpRestartFile, false); // delete the tmp file
|
|
|
+ } else if (fs.exists(tmpRestartFile)) {
|
|
|
+ // if .rec exists then delete the main file and rename the .rec to main
|
|
|
+ fs.rename(tmpRestartFile, restartFile); // rename .rec to main file
|
|
|
+ } else {
|
|
|
+ // For the very first time the jobtracker will create a jobtracker.info
|
|
|
+ // file. If the jobtracker has restarted then disable recovery as files'
|
|
|
+ // needed for recovery are missing.
|
|
|
+
|
|
|
+ // disable recovery if this is a restart
|
|
|
+ shouldRecover = false;
|
|
|
+
|
|
|
+ // write the jobtracker.info file
|
|
|
+ FSDataOutputStream out = FileSystem.create(fs, restartFile, filePerm);
|
|
|
+ out.writeInt(0);
|
|
|
+ out.close();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ FSDataInputStream in = fs.open(restartFile);
|
|
|
+ // read the old count
|
|
|
+ restartCount = in.readInt();
|
|
|
+ ++restartCount; // increment the restart count
|
|
|
+ in.close();
|
|
|
+
|
|
|
+ // Write back the new restart count and rename the old info file
|
|
|
+ //TODO This is similar to jobhistory recovery, maybe this common code
|
|
|
+ // can be factored out.
|
|
|
+
|
|
|
+ // write to the tmp file
|
|
|
+ FSDataOutputStream out = FileSystem.create(fs, tmpRestartFile, filePerm);
|
|
|
+ out.writeInt(restartCount);
|
|
|
+ out.close();
|
|
|
+
|
|
|
+ // delete the main file
|
|
|
+ fs.delete(restartFile, false);
|
|
|
+
|
|
|
+ // rename the .rec to main file
|
|
|
+ fs.rename(tmpRestartFile, restartFile);
|
|
|
+ }
|
|
|
+
|
|
|
public void recover() {
|
|
|
+ if (!shouldRecover()) {
|
|
|
+ // clean up jobs structure
|
|
|
+ jobsToRecover.clear();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("Restart count of the jobtracker : " + restartCount);
|
|
|
+
|
|
|
// I. Init the jobs and cache the recovered job history filenames
|
|
|
Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
|
|
|
Iterator<JobID> idIter = jobsToRecover.iterator();
|
|
@@ -1128,7 +1210,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
LOG.info("Trying to recover details of job " + id);
|
|
|
try {
|
|
|
// 1. Create the job object
|
|
|
- JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
|
|
|
+ JobInProgress job =
|
|
|
+ new JobInProgress(id, JobTracker.this, conf, restartCount);
|
|
|
|
|
|
// 2. Check if the user has appropriate access
|
|
|
// Get the user group info for the job's owner
|
|
@@ -1209,8 +1292,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// 3. Close the listener
|
|
|
listener.close();
|
|
|
|
|
|
- LOG.info("Restart count for job " + id + " is " + pJob.numRestarts());
|
|
|
-
|
|
|
// 4. Update the recovery metric
|
|
|
totalEventsRecovered += listener.getNumEventsRecovered();
|
|
|
|
|
@@ -1532,7 +1613,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
// Make sure that the backup data is preserved
|
|
|
FileStatus[] systemDirData = fs.listStatus(this.systemDir);
|
|
|
- LOG.info("Cleaning up the system directory");
|
|
|
// Check if the history is enabled .. as we cant have persistence with
|
|
|
// history disabled
|
|
|
if (conf.getBoolean("mapred.jobtracker.restart.recover", false)
|
|
@@ -1553,6 +1633,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
break; // if there is something to recover else clean the sys dir
|
|
|
}
|
|
|
}
|
|
|
+ LOG.info("Cleaning up the system directory");
|
|
|
fs.delete(systemDir, true);
|
|
|
if (FileSystem.mkdirs(fs, systemDir,
|
|
|
new FsPermission(SYSTEM_DIR_PERMISSION))) {
|
|
@@ -1569,6 +1650,24 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
|
|
|
}
|
|
|
+
|
|
|
+ // Prepare for recovery. This is done irrespective of the status of restart
|
|
|
+ // flag.
|
|
|
+ try {
|
|
|
+ recoveryManager.updateRestartCount();
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.warn("Failed to initialize recovery manager. The Recovery manager "
|
|
|
+ + "failed to access the system files in the system dir ("
|
|
|
+ + getSystemDir() + ").");
|
|
|
+ LOG.warn("It might be because the JobTracker failed to read/write system"
|
|
|
+ + " files (" + recoveryManager.getRestartCountFile() + " / "
|
|
|
+ + recoveryManager.getTempRestartCountFile() + ") or the system "
|
|
|
+ + " file " + recoveryManager.getRestartCountFile()
|
|
|
+ + " is missing!");
|
|
|
+ LOG.warn("Bailing out...");
|
|
|
+ throw ioe;
|
|
|
+ }
|
|
|
+
|
|
|
// Same with 'localDir' except it's always on the local disk.
|
|
|
jobConf.deleteLocalFiles(SUBDIR);
|
|
|
|