|
@@ -22,6 +22,7 @@ import java.io.IOException;
|
|
|
import java.net.BindException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.UnknownHostException;
|
|
|
+import java.text.ParseException;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
@@ -45,6 +46,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.AccessControlException;
|
|
@@ -53,6 +55,9 @@ import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
import org.apache.hadoop.ipc.RPC.VersionMismatch;
|
|
|
+import org.apache.hadoop.mapred.JobHistory.Keys;
|
|
|
+import org.apache.hadoop.mapred.JobHistory.Listener;
|
|
|
+import org.apache.hadoop.mapred.JobHistory.Values;
|
|
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
@@ -389,6 +394,501 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
|
|
|
|
|
|
+ ///////////////////////////////////////////////////////
|
|
|
+ // Used to recover the jobs upon restart
|
|
|
+ ///////////////////////////////////////////////////////
|
|
|
+ class RecoveryManager {
|
|
|
+ Set<JobID> jobsToRecover; // set of jobs to be recovered
|
|
|
+
|
|
|
+ private int totalEventsRecovered = 0;
|
|
|
+
|
|
|
+ /** A custom listener that replays the events in the order in which the
|
|
|
+ * events (task attempts) occurred.
|
|
|
+ */
|
|
|
+ class JobRecoveryListener implements Listener {
|
|
|
+ // The owner job
|
|
|
+ private JobInProgress jip;
|
|
|
+
|
|
|
+ private JobHistory.JobInfo job; // current job's info object
|
|
|
+
|
|
|
+ // Maintain the count of the (attempt) events recovered
|
|
|
+ private int numEventsRecovered = 0;
|
|
|
+
|
|
|
+ // Maintains open transactions
|
|
|
+ private Map<String, String> hangingAttempts =
|
|
|
+ new HashMap<String, String>();
|
|
|
+
|
|
|
+ // Whether there are any updates for this job
|
|
|
+ private boolean hasUpdates = false;
|
|
|
+
|
|
|
+ public JobRecoveryListener(JobInProgress jip) {
|
|
|
+ this.jip = jip;
|
|
|
+ this.job = new JobHistory.JobInfo(jip.getJobID().toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Process a task. Note that a task might commit a previously pending
|
|
|
+ * transaction.
|
|
|
+ */
|
|
|
+ private void processTask(String taskId, JobHistory.Task task) {
|
|
|
+ // Any TASK info commits the previous transaction
|
|
|
+ boolean hasHanging = hangingAttempts.remove(taskId) != null;
|
|
|
+ if (hasHanging) {
|
|
|
+ numEventsRecovered += 2;
|
|
|
+ }
|
|
|
+
|
|
|
+ TaskID id = TaskID.forName(taskId);
|
|
|
+ TaskInProgress tip = getTip(id);
|
|
|
+
|
|
|
+ updateTip(tip, task);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Adds a task-attempt in the listener
|
|
|
+ */
|
|
|
+ private void processTaskAttempt(String taskAttemptId,
|
|
|
+ JobHistory.TaskAttempt attempt) {
|
|
|
+ TaskAttemptID id = TaskAttemptID.forName(taskAttemptId);
|
|
|
+
|
|
|
+ // Check if the transaction for this attempt can be committed
|
|
|
+ String taskStatus = attempt.get(Keys.TASK_STATUS);
|
|
|
+
|
|
|
+ if (taskStatus.length() > 0) {
|
|
|
+ // This means this is an update event
|
|
|
+ if (taskStatus.equals(Values.SUCCESS.name())) {
|
|
|
+ // Mark this attempt as hanging
|
|
|
+ hangingAttempts.put(id.getTaskID().toString(), taskAttemptId);
|
|
|
+ addSuccessfulAttempt(jip, id, attempt);
|
|
|
+ } else {
|
|
|
+ addUnsuccessfulAttempt(jip, id, attempt);
|
|
|
+ numEventsRecovered += 2;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ createTaskAttempt(jip, id, attempt);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void handle(JobHistory.RecordTypes recType, Map<Keys,
|
|
|
+ String> values) throws IOException {
|
|
|
+ if (recType == JobHistory.RecordTypes.Job) {
|
|
|
+ // Update the meta-level job information
|
|
|
+ job.handle(values);
|
|
|
+
|
|
|
+ // Forcefully init the job as we have some updates for it
|
|
|
+ checkAndInit();
|
|
|
+ } else if (recType.equals(JobHistory.RecordTypes.Task)) {
|
|
|
+ String taskId = values.get(Keys.TASKID);
|
|
|
+
|
|
|
+ // Create a task
|
|
|
+ JobHistory.Task task = new JobHistory.Task();
|
|
|
+ task.handle(values);
|
|
|
+
|
|
|
+ // Ignore if its a cleanup task
|
|
|
+ if (isCleanup(task)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Process the task i.e update the tip state
|
|
|
+ processTask(taskId, task);
|
|
|
+ } else if (recType.equals(JobHistory.RecordTypes.MapAttempt)) {
|
|
|
+ String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
|
|
|
+
|
|
|
+ // Create a task attempt
|
|
|
+ JobHistory.MapAttempt attempt = new JobHistory.MapAttempt();
|
|
|
+ attempt.handle(values);
|
|
|
+
|
|
|
+ // Ignore if its a cleanup task
|
|
|
+ if (isCleanup(attempt)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Process the attempt i.e update the attempt state via job
|
|
|
+ processTaskAttempt(attemptId, attempt);
|
|
|
+ } else if (recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
|
|
|
+ String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
|
|
|
+
|
|
|
+ // Create a task attempt
|
|
|
+ JobHistory.ReduceAttempt attempt = new JobHistory.ReduceAttempt();
|
|
|
+ attempt.handle(values);
|
|
|
+
|
|
|
+ // Ignore if its a cleanup task
|
|
|
+ if (isCleanup(attempt)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Process the attempt i.e update the job state via job
|
|
|
+ processTaskAttempt(attemptId, attempt);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if the task is of type CLEANUP
|
|
|
+ private boolean isCleanup(JobHistory.Task task) {
|
|
|
+ String taskType = task.get(Keys.TASK_TYPE);
|
|
|
+ return Values.CLEANUP.name().equals(taskType);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Init the job if its ready for init. Also make sure that the scheduler
|
|
|
+ // is updated
|
|
|
+ private void checkAndInit() throws IOException {
|
|
|
+ String jobStatus = this.job.get(Keys.JOB_STATUS);
|
|
|
+ if (Values.RUNNING.name().equals(jobStatus)) {
|
|
|
+ hasUpdates = true;
|
|
|
+ LOG.info("Calling init from RM for job " + jip.getJobID().toString());
|
|
|
+ jip.initTasks();
|
|
|
+ updateJobListeners();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateJobListeners() {
|
|
|
+ // The scheduler needs to be informed as the recovery-manager
|
|
|
+ // has inited the jobs
|
|
|
+ for (JobInProgressListener listener : jobInProgressListeners) {
|
|
|
+ listener.jobUpdated(jip);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void close() {
|
|
|
+ if (hasUpdates) {
|
|
|
+ // Apply the final (job-level) updates
|
|
|
+ updateJob(jip, job);
|
|
|
+ // Update the job listeners as the start/submit time and the job
|
|
|
+ // priority has changed
|
|
|
+ updateJobListeners();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getNumEventsRecovered() {
|
|
|
+ return numEventsRecovered;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public RecoveryManager() {
|
|
|
+ jobsToRecover = new TreeSet<JobID>();
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean contains(JobID id) {
|
|
|
+ return jobsToRecover.contains(id);
|
|
|
+ }
|
|
|
+
|
|
|
+ void addJobForRecovery(JobID id) {
|
|
|
+ jobsToRecover.add(id);
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean shouldRecover() {
|
|
|
+ return jobsToRecover.size() != 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ // checks if the job dir has the required files
|
|
|
+ public void checkAndAddJob(FileStatus status) throws IOException {
|
|
|
+ String jobName = status.getPath().getName();
|
|
|
+ if (JobID.isJobNameValid(jobName)) {
|
|
|
+ if (JobClient.isJobDirValid(status.getPath(), fs)) {
|
|
|
+ recoveryManager.addJobForRecovery(JobID.forName(jobName));
|
|
|
+ } else {
|
|
|
+ LOG.info("Found an incomplete job directory " + jobName + "."
|
|
|
+ + " Deleting it!!");
|
|
|
+ fs.delete(status.getPath(), true);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LOG.info("Deleting " + status.getPath());
|
|
|
+ fs.delete(status.getPath(), true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateJob(JobInProgress jip, JobHistory.JobInfo job) {
|
|
|
+ // Set the start/launch time only if there are recovered tasks
|
|
|
+ jip.updateJobTime(job.getLong(JobHistory.Keys.SUBMIT_TIME),
|
|
|
+ job.getLong(JobHistory.Keys.LAUNCH_TIME));
|
|
|
+
|
|
|
+ // Change the job priority
|
|
|
+ String jobpriority = job.get(Keys.JOB_PRIORITY);
|
|
|
+ if (jobpriority.length() > 0) {
|
|
|
+ JobPriority priority = JobPriority.valueOf(jobpriority);
|
|
|
+ // Its important to update this via the jobtracker's api
|
|
|
+ setJobPriority(jip.getJobID(), priority);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void updateTip(TaskInProgress tip, JobHistory.Task task) {
|
|
|
+ long startTime = task.getLong(Keys.START_TIME);
|
|
|
+ if (startTime != 0) {
|
|
|
+ tip.setExecStartTime(startTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ long finishTime = task.getLong(Keys.FINISH_TIME);
|
|
|
+ // For failed tasks finish-time will be missing
|
|
|
+ if (finishTime != 0) {
|
|
|
+ tip.setExecFinishTime(finishTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ String cause = task.get(Keys.TASK_ATTEMPT_ID);
|
|
|
+ if (cause.length() > 0) {
|
|
|
+ // This means that the this is a FAILED events
|
|
|
+ TaskAttemptID id = TaskAttemptID.forName(cause);
|
|
|
+ TaskStatus status = tip.getTaskStatus(id);
|
|
|
+ // This will add the tip failed event in the new log
|
|
|
+ tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(),
|
|
|
+ status.getPhase(), status.getRunState(),
|
|
|
+ status.getTaskTracker(), myInstrumentation);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void createTaskAttempt(JobInProgress job,
|
|
|
+ TaskAttemptID attemptId,
|
|
|
+ JobHistory.TaskAttempt attempt) {
|
|
|
+ TaskID id = attemptId.getTaskID();
|
|
|
+ String type = attempt.get(Keys.TASK_TYPE);
|
|
|
+ TaskInProgress tip = job.getTaskInProgress(id);
|
|
|
+
|
|
|
+ // I. Get the required info
|
|
|
+ TaskStatus taskStatus = null;
|
|
|
+ String trackerName = attempt.get(Keys.TRACKER_NAME);
|
|
|
+ String trackerHostName =
|
|
|
+ JobInProgress.convertTrackerNameToHostName(trackerName);
|
|
|
+ int index = trackerHostName.indexOf("_");
|
|
|
+ trackerHostName =
|
|
|
+ trackerHostName.substring(index + 1, trackerHostName.length());
|
|
|
+ int port = attempt.getInt(Keys.HTTP_PORT);
|
|
|
+
|
|
|
+ long attemptStartTime = attempt.getLong(Keys.START_TIME);
|
|
|
+
|
|
|
+ // II. Create the (appropriate) task status
|
|
|
+ if (type.equals(Values.MAP.name())) {
|
|
|
+ taskStatus =
|
|
|
+ new MapTaskStatus(attemptId, 0.0f, TaskStatus.State.RUNNING,
|
|
|
+ "", "", trackerName, TaskStatus.Phase.MAP,
|
|
|
+ new Counters());
|
|
|
+ } else {
|
|
|
+ taskStatus =
|
|
|
+ new ReduceTaskStatus(attemptId, 0.0f, TaskStatus.State.RUNNING,
|
|
|
+ "", "", trackerName, TaskStatus.Phase.REDUCE,
|
|
|
+ new Counters());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Set the start time
|
|
|
+ taskStatus.setStartTime(attemptStartTime);
|
|
|
+
|
|
|
+ List<TaskStatus> ttStatusList = new ArrayList<TaskStatus>();
|
|
|
+ ttStatusList.add(taskStatus);
|
|
|
+
|
|
|
+ // III. Create the dummy tasktracker status
|
|
|
+ TaskTrackerStatus ttStatus =
|
|
|
+ new TaskTrackerStatus(trackerName, trackerHostName, port, ttStatusList,
|
|
|
+ 0 , 0, 0);
|
|
|
+ ttStatus.setLastSeen(System.currentTimeMillis());
|
|
|
+
|
|
|
+ // IV. Register a new tracker
|
|
|
+ boolean isTrackerRegistered = getTaskTracker(trackerName) != null;
|
|
|
+ if (!isTrackerRegistered) {
|
|
|
+ addNewTracker(ttStatus);
|
|
|
+ }
|
|
|
+
|
|
|
+ // V. Update the tracker status
|
|
|
+ // This will update the meta info of the jobtracker and also add the
|
|
|
+ // tracker status if missing i.e register it
|
|
|
+ updateTaskTrackerStatus(trackerName, ttStatus);
|
|
|
+
|
|
|
+ // VI. Register the attempt
|
|
|
+ // a) In the job
|
|
|
+ job.addRunningTaskToTIP(tip, attemptId, ttStatus, false);
|
|
|
+ // b) In the tip
|
|
|
+ tip.updateStatus(taskStatus);
|
|
|
+
|
|
|
+ // VII. Make an entry in the launched tasks
|
|
|
+ expireLaunchingTasks.addNewTask(attemptId);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addSuccessfulAttempt(JobInProgress job,
|
|
|
+ TaskAttemptID attemptId,
|
|
|
+ JobHistory.TaskAttempt attempt) {
|
|
|
+ // I. Get the required info
|
|
|
+ TaskID taskId = attemptId.getTaskID();
|
|
|
+ String type = attempt.get(Keys.TASK_TYPE);
|
|
|
+
|
|
|
+ TaskInProgress tip = job.getTaskInProgress(taskId);
|
|
|
+ long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
|
|
|
+
|
|
|
+ // Get the task status and the tracker name and make a copy of it
|
|
|
+ TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
|
|
|
+ taskStatus.setFinishTime(attemptFinishTime);
|
|
|
+
|
|
|
+ String stateString = attempt.get(Keys.STATE_STRING);
|
|
|
+
|
|
|
+ // Update the basic values
|
|
|
+ taskStatus.setStateString(stateString);
|
|
|
+ taskStatus.setProgress(1.0f);
|
|
|
+ taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
|
|
|
+
|
|
|
+ // Set the shuffle/sort finished times
|
|
|
+ if (type.equals(Values.REDUCE.name())) {
|
|
|
+ long shuffleTime =
|
|
|
+ Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED));
|
|
|
+ long sortTime =
|
|
|
+ Long.parseLong(attempt.get(Keys.SORT_FINISHED));
|
|
|
+ taskStatus.setShuffleFinishTime(shuffleTime);
|
|
|
+ taskStatus.setSortFinishTime(sortTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add the counters
|
|
|
+ String counterString = attempt.get(Keys.COUNTERS);
|
|
|
+ Counters counter = null;
|
|
|
+ //TODO Check if an exception should be thrown
|
|
|
+ try {
|
|
|
+ counter = Counters.fromEscapedCompactString(counterString);
|
|
|
+ } catch (ParseException pe) {
|
|
|
+ counter = new Counters(); // Set it to empty counter
|
|
|
+ }
|
|
|
+ taskStatus.setCounters(counter);
|
|
|
+
|
|
|
+ // II. Replay the status
|
|
|
+ job.updateTaskStatus(tip, taskStatus, myInstrumentation);
|
|
|
+
|
|
|
+ // III. Prevent the task from expiry
|
|
|
+ expireLaunchingTasks.removeTask(attemptId);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addUnsuccessfulAttempt(JobInProgress job,
|
|
|
+ TaskAttemptID attemptId,
|
|
|
+ JobHistory.TaskAttempt attempt) {
|
|
|
+ // I. Get the required info
|
|
|
+ TaskID taskId = attemptId.getTaskID();
|
|
|
+ TaskInProgress tip = job.getTaskInProgress(taskId);
|
|
|
+ long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
|
|
|
+
|
|
|
+ TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
|
|
|
+ taskStatus.setFinishTime(attemptFinishTime);
|
|
|
+
|
|
|
+ // Reset the progress
|
|
|
+ taskStatus.setProgress(0.0f);
|
|
|
+
|
|
|
+ String stateString = attempt.get(Keys.STATE_STRING);
|
|
|
+ taskStatus.setStateString(stateString);
|
|
|
+
|
|
|
+ boolean hasFailed =
|
|
|
+ attempt.get(Keys.TASK_STATUS).equals(Values.FAILED.name());
|
|
|
+ // Set the state failed/killed
|
|
|
+ if (hasFailed) {
|
|
|
+ taskStatus.setRunState(TaskStatus.State.FAILED);
|
|
|
+ } else {
|
|
|
+ taskStatus.setRunState(TaskStatus.State.KILLED);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Get/Set the error msg
|
|
|
+ String diagInfo = attempt.get(Keys.ERROR);
|
|
|
+ taskStatus.setDiagnosticInfo(diagInfo); // diag info
|
|
|
+
|
|
|
+ // II. Update the task status
|
|
|
+ job.updateTaskStatus(tip, taskStatus, myInstrumentation);
|
|
|
+
|
|
|
+ // III. Prevent the task from expiry
|
|
|
+ expireLaunchingTasks.removeTask(attemptId);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void recover() throws IOException {
|
|
|
+ // I. Init the jobs and cache the recovered job history filenames
|
|
|
+ Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
|
|
|
+ for (JobID id : jobsToRecover) {
|
|
|
+ // 1. Create the job object
|
|
|
+ JobInProgress job = new JobInProgress(id, JobTracker.this, conf);
|
|
|
+
|
|
|
+ // 2. Get the log file and the file path
|
|
|
+ String logFileName =
|
|
|
+ JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
|
|
|
+ Path jobHistoryFilePath =
|
|
|
+ JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
|
|
|
+
|
|
|
+ // 3. Recover the history file. This involved
|
|
|
+ // - deleting file.recover if file exists
|
|
|
+ // - renaming file.recover to file if file doesnt exist
|
|
|
+ // This makes sure that the (master) file exists
|
|
|
+ JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(),
|
|
|
+ jobHistoryFilePath);
|
|
|
+
|
|
|
+ // 4. Cache the history file name as it costs one dfs access
|
|
|
+ jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
|
|
|
+
|
|
|
+ // 5. Sumbit the job to the jobtracker
|
|
|
+ addJob(id, job);
|
|
|
+ }
|
|
|
+
|
|
|
+ long recoveryStartTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ // II. Recover each job
|
|
|
+ for (JobID id : jobsToRecover) {
|
|
|
+ JobInProgress pJob = getJob(id);
|
|
|
+
|
|
|
+ // 1. Get the required info
|
|
|
+ // Get the recovered history file
|
|
|
+ Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());
|
|
|
+ String logFileName = jobHistoryFilePath.getName();
|
|
|
+
|
|
|
+ FileSystem fs = jobHistoryFilePath.getFileSystem(conf);
|
|
|
+
|
|
|
+ // 2. Parse the history file
|
|
|
+ // Note that this also involves job update
|
|
|
+ JobRecoveryListener listener = new JobRecoveryListener(pJob);
|
|
|
+ try {
|
|
|
+ JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(),
|
|
|
+ listener, fs);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.info("JobTracker failed to recover job " + pJob + "."
|
|
|
+ + " Ignoring it.", e);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 3. Close the listener
|
|
|
+ listener.close();
|
|
|
+
|
|
|
+ // 4. Update the recovery metric
|
|
|
+ totalEventsRecovered += listener.getNumEventsRecovered();
|
|
|
+
|
|
|
+ // 5. Cleanup history
|
|
|
+ // Delete the master log file as an indication that the new file
|
|
|
+ // should be used in future
|
|
|
+ synchronized (pJob) {
|
|
|
+ JobHistory.JobInfo.checkpointRecovery(logFileName,
|
|
|
+ pJob.getJobConf());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 6. Inform the jobtracker as to how much of the data is recovered.
|
|
|
+ // This is done so that TT should rollback to account for lost
|
|
|
+ // updates
|
|
|
+ lastSeenEventMapOnRestart.put(pJob.getStatus().getJobID(),
|
|
|
+ pJob.getNumTaskCompletionEvents());
|
|
|
+ }
|
|
|
+
|
|
|
+ recoveryDuration = System.currentTimeMillis() - recoveryStartTime;
|
|
|
+ hasRecovered = true;
|
|
|
+
|
|
|
+ // III. Finalize the recovery
|
|
|
+ // Make sure that the tracker statuses in the expiry-tracker queue
|
|
|
+ // are updated
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ int size = trackerExpiryQueue.size();
|
|
|
+ for (int i = 0; i < size ; ++i) {
|
|
|
+ // Get the first status
|
|
|
+ TaskTrackerStatus status = trackerExpiryQueue.first();
|
|
|
+
|
|
|
+ // Remove it
|
|
|
+ trackerExpiryQueue.remove(status);
|
|
|
+
|
|
|
+ // Set the new time
|
|
|
+ status.setLastSeen(now);
|
|
|
+
|
|
|
+ // Add back to get the sorted list
|
|
|
+ trackerExpiryQueue.add(status);
|
|
|
+ }
|
|
|
+
|
|
|
+ // IV. Cleanup
|
|
|
+ jobsToRecover.clear();
|
|
|
+ LOG.info("Restoration complete");
|
|
|
+ }
|
|
|
+
|
|
|
+ int totalEventsRecovered() {
|
|
|
+ return totalEventsRecovered;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
private JobTrackerInstrumentation myInstrumentation = null;
|
|
|
|
|
@@ -403,6 +903,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
private int totalMapTaskCapacity;
|
|
|
private int totalReduceTaskCapacity;
|
|
|
private HostsFileReader hostsReader;
|
|
|
+
|
|
|
+ // JobTracker recovery variables
|
|
|
+ private volatile boolean hasRestarted = false;
|
|
|
+ private volatile boolean hasRecovered = false;
|
|
|
+ private volatile long recoveryDuration;
|
|
|
|
|
|
//
|
|
|
// Properties to maintain while running Jobs and Tasks:
|
|
@@ -450,6 +955,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
Map<String, Node> hostnameToNodeMap =
|
|
|
Collections.synchronizedMap(new TreeMap<String, Node>());
|
|
|
|
|
|
+ // A map from JobID to the last known task-completion-event-index on restart
|
|
|
+ Map<JobID, Integer> lastSeenEventMapOnRestart =
|
|
|
+ new HashMap<JobID, Integer>();
|
|
|
+
|
|
|
// Number of resolved entries
|
|
|
int numResolved;
|
|
|
|
|
@@ -472,6 +981,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
CompletedJobStatusStore completedJobStatusStore = null;
|
|
|
Thread completedJobsStoreThread = null;
|
|
|
+ RecoveryManager recoveryManager;
|
|
|
|
|
|
/**
|
|
|
* It might seem like a bug to maintain a TreeSet of status objects,
|
|
@@ -548,7 +1058,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
this.port = addr.getPort();
|
|
|
int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
|
|
|
this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount, false, conf);
|
|
|
- this.interTrackerServer.start();
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
Properties p = System.getProperties();
|
|
|
for (Iterator it = p.keySet().iterator(); it.hasNext();) {
|
|
@@ -582,8 +1091,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
infoServer.start();
|
|
|
|
|
|
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
|
|
|
- trackerIdentifier = dateFormat.format(new Date());
|
|
|
+ trackerIdentifier = getDateFormat().format(new Date());
|
|
|
|
|
|
Class<? extends JobTrackerInstrumentation> metricsInst = getInstrumentationClass(jobConf);
|
|
|
try {
|
|
@@ -608,6 +1116,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
infoBindAddress + ":" + this.infoPort);
|
|
|
LOG.info("JobTracker webserver: " + this.infoServer.getPort());
|
|
|
|
|
|
+ // start the recovery manager
|
|
|
+ recoveryManager = new RecoveryManager();
|
|
|
+
|
|
|
while (true) {
|
|
|
try {
|
|
|
// if we haven't contacted the namenode go ahead and do it
|
|
@@ -619,6 +1130,24 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
if(systemDir == null) {
|
|
|
systemDir = new Path(getSystemDir());
|
|
|
}
|
|
|
+ // Make sure that the backup data is preserved
|
|
|
+ FileStatus[] systemDirData = fs.listStatus(this.systemDir);
|
|
|
+ LOG.info("Cleaning up the system directory");
|
|
|
+ // Check if the history is enabled .. as we cant have persistence with
|
|
|
+ // history disabled
|
|
|
+ if (conf.getBoolean("mapred.jobtracker.restart.recover", false)
|
|
|
+ && !JobHistory.isDisableHistory()
|
|
|
+ && systemDirData != null) {
|
|
|
+ for (FileStatus status : systemDirData) {
|
|
|
+ recoveryManager.checkAndAddJob(status);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check if there are jobs to be recovered
|
|
|
+ hasRestarted = recoveryManager.shouldRecover();
|
|
|
+ if (hasRestarted) {
|
|
|
+ break; // if there is something to recover else clean the sys dir
|
|
|
+ }
|
|
|
+ }
|
|
|
fs.delete(systemDir, true);
|
|
|
if (FileSystem.mkdirs(fs, systemDir,
|
|
|
new FsPermission(SYSTEM_DIR_PERMISSION))) {
|
|
@@ -653,14 +1182,54 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
DNSToSwitchMapping.class), conf);
|
|
|
this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels",
|
|
|
NetworkTopology.DEFAULT_HOST_LEVEL);
|
|
|
- synchronized (this) {
|
|
|
- state = State.RUNNING;
|
|
|
- }
|
|
|
|
|
|
//initializes the job status store
|
|
|
completedJobStatusStore = new CompletedJobStatusStore(conf,fs);
|
|
|
+ }
|
|
|
|
|
|
- LOG.info("Starting RUNNING");
|
|
|
+ private static SimpleDateFormat getDateFormat() {
|
|
|
+ return new SimpleDateFormat("yyyyMMddHHmm");
|
|
|
+ }
|
|
|
+
|
|
|
+ static boolean validateIdentifier(String id) {
|
|
|
+ try {
|
|
|
+ // the jobtracker id should be 'date' parseable
|
|
|
+ getDateFormat().parse(id);
|
|
|
+ return true;
|
|
|
+ } catch (ParseException pe) {}
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ static boolean validateJobNumber(String id) {
|
|
|
+ try {
|
|
|
+ // the job number should be integer parseable
|
|
|
+ Integer.parseInt(id);
|
|
|
+ return true;
|
|
|
+ } catch (IllegalArgumentException pe) {}
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Whether the JT has restarted
|
|
|
+ */
|
|
|
+ public boolean hasRestarted() {
|
|
|
+ return hasRestarted;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Whether the JT has recovered upon restart
|
|
|
+ */
|
|
|
+ public boolean hasRecovered() {
|
|
|
+ return hasRecovered;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * How long the jobtracker took to recover from restart.
|
|
|
+ */
|
|
|
+ public long getRecoveryDuration() {
|
|
|
+ return hasRestarted()
|
|
|
+ ? recoveryDuration
|
|
|
+ : 0;
|
|
|
}
|
|
|
|
|
|
public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
|
|
@@ -683,12 +1252,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* Run forever
|
|
|
*/
|
|
|
public void offerService() throws InterruptedException, IOException {
|
|
|
+ taskScheduler.start();
|
|
|
+
|
|
|
+ // Start the recovery after starting the scheduler
|
|
|
+ recoveryManager.recover();
|
|
|
+
|
|
|
this.expireTrackersThread = new Thread(this.expireTrackers,
|
|
|
"expireTrackers");
|
|
|
this.expireTrackersThread.start();
|
|
|
this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
|
|
|
this.retireJobsThread.start();
|
|
|
- taskScheduler.start();
|
|
|
expireLaunchingTaskThread.start();
|
|
|
|
|
|
if (completedJobStatusStore.isActive()) {
|
|
@@ -697,6 +1270,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
completedJobsStoreThread.start();
|
|
|
}
|
|
|
|
|
|
+ // start the inter-tracker server once the jt is ready
|
|
|
+ this.interTrackerServer.start();
|
|
|
+
|
|
|
+ synchronized (this) {
|
|
|
+ state = State.RUNNING;
|
|
|
+ }
|
|
|
+ LOG.info("Starting RUNNING");
|
|
|
+
|
|
|
this.interTrackerServer.join();
|
|
|
LOG.info("Stopped interTrackerServer");
|
|
|
}
|
|
@@ -778,6 +1359,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
// taskid --> TIP
|
|
|
taskidToTIPMap.put(taskid, tip);
|
|
|
+
|
|
|
+ // Note this launch
|
|
|
+ if (taskid.isMap()) {
|
|
|
+ myInstrumentation.launchMap(taskid);
|
|
|
+ } else {
|
|
|
+ myInstrumentation.launchReduce(taskid);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
void removeTaskEntry(TaskAttemptID taskid) {
|
|
@@ -908,6 +1496,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus());
|
|
|
|
|
|
+ // start the merge of log files
|
|
|
+ JobID id = job.getStatus().getJobID();
|
|
|
+ try {
|
|
|
+ JobHistory.JobInfo.finalizeRecovery(id, job.getJobConf());
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ LOG.info("Failed to finalize the log file recovery for job " + id, ioe);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
// Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
|
|
|
// in memory; information about the purged jobs is available via
|
|
|
// JobHistory.
|
|
@@ -1055,6 +1652,21 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Adds a new node to the jobtracker. It involves adding it to the expiry
|
|
|
+ * thread and adding it for resolution
|
|
|
+ * @param status Task Tracker's status
|
|
|
+ * @param resolveInline Should the resolution happen inline?
|
|
|
+ */
|
|
|
+ private void addNewTracker(TaskTrackerStatus status) {
|
|
|
+ trackerExpiryQueue.add(status);
|
|
|
+ // Register the tracker if its not registered
|
|
|
+ if (getNode(status.getTrackerName()) == null) {
|
|
|
+ // Making the network location resolution inline ..
|
|
|
+ resolveAndAddToTopology(status.getHost());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public Node resolveAndAddToTopology(String name) {
|
|
|
List <String> tmpList = new ArrayList<String>(1);
|
|
|
tmpList.add(name);
|
|
@@ -1168,6 +1780,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
HeartbeatResponse prevHeartbeatResponse =
|
|
|
trackerToHeartbeatResponseMap.get(trackerName);
|
|
|
+ boolean addRestartInfo = false;
|
|
|
|
|
|
if (initialContact != true) {
|
|
|
// If this isn't the 'initial contact' from the tasktracker,
|
|
@@ -1175,32 +1788,34 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// no record of the 'previous heartbeat'; if so, ask the
|
|
|
// tasktracker to re-initialize itself.
|
|
|
if (prevHeartbeatResponse == null) {
|
|
|
- LOG.warn("Serious problem, cannot find record of 'previous' " +
|
|
|
- "heartbeat for '" + trackerName +
|
|
|
- "'; reinitializing the tasktracker");
|
|
|
- return new HeartbeatResponse(responseId,
|
|
|
- new TaskTrackerAction[] {new ReinitTrackerAction()});
|
|
|
+ // This is the first heartbeat from the old tracker to the newly
|
|
|
+ // started JobTracker
|
|
|
+ if (hasRestarted()) {
|
|
|
+ addRestartInfo = true;
|
|
|
+ } else {
|
|
|
+ // Jobtracker might have restarted but no recovery is needed
|
|
|
+ LOG.warn("Serious problem, cannot find record of 'previous' " +
|
|
|
+ "heartbeat for '" + trackerName +
|
|
|
+ "'; reinitializing the tasktracker");
|
|
|
+ return new HeartbeatResponse(responseId,
|
|
|
+ new TaskTrackerAction[] {new ReinitTrackerAction()});
|
|
|
+ }
|
|
|
|
|
|
- }
|
|
|
+ } else {
|
|
|
|
|
|
- // It is completely safe to not process a 'duplicate' heartbeat from a
|
|
|
- // {@link TaskTracker} since it resends the heartbeat when rpcs are lost -
|
|
|
- // @see {@link TaskTracker.transmitHeartbeat()};
|
|
|
- // acknowledge it by re-sending the previous response to let the
|
|
|
- // {@link TaskTracker} go forward.
|
|
|
- if (prevHeartbeatResponse.getResponseId() != responseId) {
|
|
|
- LOG.info("Ignoring 'duplicate' heartbeat from '" +
|
|
|
- trackerName + "'; resending the previous 'lost' response");
|
|
|
- return prevHeartbeatResponse;
|
|
|
+ // It is completely safe to not process a 'duplicate' heartbeat from a
|
|
|
+ // {@link TaskTracker} since it resends the heartbeat when rpcs are
|
|
|
+ // lost see {@link TaskTracker.transmitHeartbeat()};
|
|
|
+ // acknowledge it by re-sending the previous response to let the
|
|
|
+ // {@link TaskTracker} go forward.
|
|
|
+ if (prevHeartbeatResponse.getResponseId() != responseId) {
|
|
|
+ LOG.info("Ignoring 'duplicate' heartbeat from '" +
|
|
|
+ trackerName + "'; resending the previous 'lost' response");
|
|
|
+ return prevHeartbeatResponse;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Register the tracker if its not registered
|
|
|
- if (getNode(trackerName) == null) {
|
|
|
- // Making the network location resolution inline ..
|
|
|
- resolveAndAddToTopology(status.getHost());
|
|
|
- }
|
|
|
-
|
|
|
// Process this heartbeat
|
|
|
short newResponseId = (short)(responseId + 1);
|
|
|
if (!processHeartbeat(status, initialContact)) {
|
|
@@ -1229,11 +1844,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
if (tasks != null) {
|
|
|
for (Task task : tasks) {
|
|
|
expireLaunchingTasks.addNewTask(task.getTaskID());
|
|
|
- if (task.isMapTask()) {
|
|
|
- myInstrumentation.launchMap(task.getTaskID());
|
|
|
- } else {
|
|
|
- myInstrumentation.launchReduce(task.getTaskID());
|
|
|
- }
|
|
|
LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
|
|
|
actions.add(new LaunchTaskAction(task));
|
|
|
}
|
|
@@ -1258,6 +1868,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
response.setHeartbeatInterval(nextInterval);
|
|
|
response.setActions(
|
|
|
actions.toArray(new TaskTrackerAction[actions.size()]));
|
|
|
+
|
|
|
+ // check if the restart info is req
|
|
|
+ if (addRestartInfo) {
|
|
|
+ response.setLastKnownIndices(lastSeenEventMapOnRestart);
|
|
|
+ }
|
|
|
|
|
|
// Update the trackerToHeartbeatResponseMap
|
|
|
trackerToHeartbeatResponseMap.put(trackerName, response);
|
|
@@ -1388,7 +2003,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
}
|
|
|
|
|
|
if (initialContact) {
|
|
|
- trackerExpiryQueue.add(trackerStatus);
|
|
|
+ addNewTracker(trackerStatus);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1526,20 +2141,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
// JobSubmissionProtocol
|
|
|
////////////////////////////////////////////////////
|
|
|
|
|
|
- /**
|
|
|
- * Make sure the JobTracker is done initializing.
|
|
|
- */
|
|
|
- private synchronized void ensureRunning() throws IllegalStateException {
|
|
|
- if (state != State.RUNNING) {
|
|
|
- throw new IllegalStateException("Job tracker still initializing");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Allocates a new JobId string.
|
|
|
*/
|
|
|
public synchronized JobID getNewJobId() throws IOException {
|
|
|
- ensureRunning();
|
|
|
return new JobID(getTrackerIdentifier(), nextJobId++);
|
|
|
}
|
|
|
|
|
@@ -1552,14 +2157,23 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* the JobTracker alone.
|
|
|
*/
|
|
|
public synchronized JobStatus submitJob(JobID jobId) throws IOException {
|
|
|
- ensureRunning();
|
|
|
if(jobs.containsKey(jobId)) {
|
|
|
//job already running, don't start twice
|
|
|
return jobs.get(jobId).getStatus();
|
|
|
}
|
|
|
|
|
|
- totalSubmissions++;
|
|
|
JobInProgress job = new JobInProgress(jobId, this, this.conf);
|
|
|
+ return addJob(jobId, job);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Adds a job to the jobtracker. Make sure that the checks are inplace before
|
|
|
+ * adding a job. This is the core job submission logic
|
|
|
+ * @param jobId The id for the job submitted which needs to be added
|
|
|
+ */
|
|
|
+ private synchronized JobStatus addJob(JobID jobId, JobInProgress job)
|
|
|
+ throws IOException {
|
|
|
+ totalSubmissions++;
|
|
|
checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
|
|
|
|
|
|
synchronized (jobs) {
|
|
@@ -1885,11 +2499,19 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
report.setTaskTracker(trackerName);
|
|
|
TaskAttemptID taskId = report.getTaskID();
|
|
|
TaskInProgress tip = taskidToTIPMap.get(taskId);
|
|
|
- if (tip == null) {
|
|
|
- LOG.info("Serious problem. While updating status, cannot find taskid " + report.getTaskID());
|
|
|
- } else {
|
|
|
+ // Check if the tip is known to the jobtracker. In case of a restarted
|
|
|
+ // jt, some tasks might join in later
|
|
|
+ if (tip != null || hasRestarted()) {
|
|
|
+ if (tip == null) {
|
|
|
+ JobInProgress job = getJob(taskId.getJobID());
|
|
|
+ tip = job.getTaskInProgress(taskId.getTaskID());
|
|
|
+ job.addRunningTaskToTIP(tip, taskId, status, false);
|
|
|
+ }
|
|
|
expireLaunchingTasks.removeTask(taskId);
|
|
|
tip.getJob().updateTaskStatus(tip, report, myInstrumentation);
|
|
|
+ } else {
|
|
|
+ LOG.info("Serious problem. While updating status, cannot find taskid "
|
|
|
+ + report.getTaskID());
|
|
|
}
|
|
|
|
|
|
// Process 'failed fetch' notifications
|