|
@@ -22,7 +22,6 @@ import java.io.IOException;
|
|
|
import java.net.BindException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.UnknownHostException;
|
|
|
-import java.text.NumberFormat;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
@@ -37,7 +36,6 @@ import java.util.Map;
|
|
|
import java.util.Properties;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.TreeSet;
|
|
|
import java.util.Vector;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
@@ -51,18 +49,18 @@ import org.apache.hadoop.fs.permission.AccessControlException;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
-import org.apache.hadoop.ipc.RPC.VersionMismatch;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
+import org.apache.hadoop.ipc.RPC.VersionMismatch;
|
|
|
import org.apache.hadoop.metrics.MetricsContext;
|
|
|
import org.apache.hadoop.metrics.MetricsRecord;
|
|
|
import org.apache.hadoop.metrics.MetricsUtil;
|
|
|
import org.apache.hadoop.metrics.Updater;
|
|
|
import org.apache.hadoop.metrics.jvm.JvmMetrics;
|
|
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
|
+import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
|
-import org.apache.hadoop.net.NodeBase;
|
|
|
import org.apache.hadoop.net.Node;
|
|
|
-import org.apache.hadoop.net.NetUtils;
|
|
|
+import org.apache.hadoop.net.NodeBase;
|
|
|
import org.apache.hadoop.net.ScriptBasedMapping;
|
|
|
import org.apache.hadoop.util.HostsFileReader;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
@@ -108,15 +106,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* jobs kept in memory per-user.
|
|
|
*/
|
|
|
final int MAX_COMPLETE_USER_JOBS_IN_MEMORY;
|
|
|
-
|
|
|
- /**
|
|
|
- * Used for formatting the id numbers
|
|
|
- */
|
|
|
- private static NumberFormat idFormat = NumberFormat.getInstance();
|
|
|
- static {
|
|
|
- idFormat.setMinimumIntegerDigits(4);
|
|
|
- idFormat.setGroupingUsed(false);
|
|
|
- }
|
|
|
|
|
|
private int nextJobId = 1;
|
|
|
|
|
@@ -183,10 +172,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
/**
|
|
|
* This is a map of the tasks that have been assigned to task trackers,
|
|
|
* but that have not yet been seen in a status report.
|
|
|
- * map: task-id (String) -> time-assigned (Long)
|
|
|
+ * map: task-id -> time-assigned
|
|
|
*/
|
|
|
- private Map<String, Long> launchingTasks =
|
|
|
- new LinkedHashMap<String, Long>();
|
|
|
+ private Map<TaskAttemptID, Long> launchingTasks =
|
|
|
+ new LinkedHashMap<TaskAttemptID, Long>();
|
|
|
|
|
|
public void run() {
|
|
|
while (true) {
|
|
@@ -197,11 +186,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
LOG.debug("Starting launching task sweep");
|
|
|
synchronized (JobTracker.this) {
|
|
|
synchronized (launchingTasks) {
|
|
|
- Iterator<Map.Entry<String, Long>> itr =
|
|
|
+ Iterator<Map.Entry<TaskAttemptID, Long>> itr =
|
|
|
launchingTasks.entrySet().iterator();
|
|
|
while (itr.hasNext()) {
|
|
|
- Map.Entry<String, Long> pair = itr.next();
|
|
|
- String taskId = pair.getKey();
|
|
|
+ Map.Entry<TaskAttemptID, Long> pair = itr.next();
|
|
|
+ TaskAttemptID taskId = pair.getKey();
|
|
|
long age = now - (pair.getValue()).longValue();
|
|
|
LOG.info(taskId + " is " + age + " ms debug.");
|
|
|
if (age > TASKTRACKER_EXPIRY_INTERVAL) {
|
|
@@ -243,14 +232,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void addNewTask(String taskName) {
|
|
|
+ public void addNewTask(TaskAttemptID taskName) {
|
|
|
synchronized (launchingTasks) {
|
|
|
launchingTasks.put(taskName,
|
|
|
System.currentTimeMillis());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void removeTask(String taskName) {
|
|
|
+ public void removeTask(TaskAttemptID taskName) {
|
|
|
synchronized (launchingTasks) {
|
|
|
launchingTasks.remove(taskName);
|
|
|
}
|
|
@@ -365,7 +354,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
synchronized (jobInitQueue) {
|
|
|
for (JobInProgress job: retiredJobs) {
|
|
|
removeJobTasks(job);
|
|
|
- jobs.remove(job.getProfile().getJobId());
|
|
|
+ jobs.remove(job.getProfile().getJobID());
|
|
|
jobInitQueue.remove(job);
|
|
|
jobsByPriority.remove(job);
|
|
|
String jobUser = job.getProfile().getUser();
|
|
@@ -380,7 +369,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
}
|
|
|
LOG.info("Retired job with id: '" +
|
|
|
- job.getProfile().getJobId() + "' of user '" +
|
|
|
+ job.getProfile().getJobID() + "' of user '" +
|
|
|
jobUser + "'");
|
|
|
}
|
|
|
}
|
|
@@ -536,7 +525,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
//
|
|
|
|
|
|
// All the known jobs. (jobid->JobInProgress)
|
|
|
- Map<String, JobInProgress> jobs = new TreeMap<String, JobInProgress>();
|
|
|
+ Map<JobID, JobInProgress> jobs = new TreeMap<JobID, JobInProgress>();
|
|
|
List<JobInProgress> jobsByPriority = new ArrayList<JobInProgress>();
|
|
|
|
|
|
// (user -> list of JobInProgress)
|
|
@@ -544,19 +533,19 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
new TreeMap<String, ArrayList<JobInProgress>>();
|
|
|
|
|
|
// All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
|
|
|
- Map<String, TaskInProgress> taskidToTIPMap =
|
|
|
- new TreeMap<String, TaskInProgress>();
|
|
|
+ Map<TaskAttemptID, TaskInProgress> taskidToTIPMap =
|
|
|
+ new TreeMap<TaskAttemptID, TaskInProgress>();
|
|
|
|
|
|
// (taskid --> trackerID)
|
|
|
- TreeMap<String, String> taskidToTrackerMap = new TreeMap<String, String>();
|
|
|
+ TreeMap<TaskAttemptID, String> taskidToTrackerMap = new TreeMap<TaskAttemptID, String>();
|
|
|
|
|
|
// (trackerID->TreeSet of taskids running at that tracker)
|
|
|
- TreeMap<String, Set<String>> trackerToTaskMap =
|
|
|
- new TreeMap<String, Set<String>>();
|
|
|
+ TreeMap<String, Set<TaskAttemptID>> trackerToTaskMap =
|
|
|
+ new TreeMap<String, Set<TaskAttemptID>>();
|
|
|
|
|
|
// (trackerID -> TreeSet of completed taskids running at that tracker)
|
|
|
- TreeMap<String, Set<String>> trackerToMarkedTasksMap =
|
|
|
- new TreeMap<String, Set<String>>();
|
|
|
+ TreeMap<String, Set<TaskAttemptID>> trackerToMarkedTasksMap =
|
|
|
+ new TreeMap<String, Set<TaskAttemptID>>();
|
|
|
|
|
|
// (trackerID --> last sent HeartBeatResponse)
|
|
|
Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap =
|
|
@@ -879,16 +868,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
// Maintain lookup tables; called by JobInProgress
|
|
|
// and TaskInProgress
|
|
|
///////////////////////////////////////////////////////
|
|
|
- void createTaskEntry(String taskid, String taskTracker, TaskInProgress tip) {
|
|
|
+ void createTaskEntry(TaskAttemptID taskid, String taskTracker, TaskInProgress tip) {
|
|
|
LOG.info("Adding task '" + taskid + "' to tip " + tip.getTIPId() + ", for tracker '" + taskTracker + "'");
|
|
|
|
|
|
// taskid --> tracker
|
|
|
taskidToTrackerMap.put(taskid, taskTracker);
|
|
|
|
|
|
// tracker --> taskid
|
|
|
- Set<String> taskset = trackerToTaskMap.get(taskTracker);
|
|
|
+ Set<TaskAttemptID> taskset = trackerToTaskMap.get(taskTracker);
|
|
|
if (taskset == null) {
|
|
|
- taskset = new TreeSet<String>();
|
|
|
+ taskset = new TreeSet<TaskAttemptID>();
|
|
|
trackerToTaskMap.put(taskTracker, taskset);
|
|
|
}
|
|
|
taskset.add(taskid);
|
|
@@ -897,13 +886,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
taskidToTIPMap.put(taskid, tip);
|
|
|
}
|
|
|
|
|
|
- void removeTaskEntry(String taskid) {
|
|
|
+ void removeTaskEntry(TaskAttemptID taskid) {
|
|
|
// taskid --> tracker
|
|
|
String tracker = taskidToTrackerMap.remove(taskid);
|
|
|
|
|
|
// tracker --> taskid
|
|
|
if (tracker != null) {
|
|
|
- Set<String> trackerSet = trackerToTaskMap.get(tracker);
|
|
|
+ Set<TaskAttemptID> trackerSet = trackerToTaskMap.get(tracker);
|
|
|
if (trackerSet != null) {
|
|
|
trackerSet.remove(taskid);
|
|
|
}
|
|
@@ -922,11 +911,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* @param taskTracker the tasktracker at which the 'task' was running
|
|
|
* @param taskid completed (success/failure/killed) task
|
|
|
*/
|
|
|
- void markCompletedTaskAttempt(String taskTracker, String taskid) {
|
|
|
+ void markCompletedTaskAttempt(String taskTracker, TaskAttemptID taskid) {
|
|
|
// tracker --> taskid
|
|
|
- Set<String> taskset = trackerToMarkedTasksMap.get(taskTracker);
|
|
|
+ Set<TaskAttemptID> taskset = trackerToMarkedTasksMap.get(taskTracker);
|
|
|
if (taskset == null) {
|
|
|
- taskset = new TreeSet<String>();
|
|
|
+ taskset = new TreeSet<TaskAttemptID>();
|
|
|
trackerToMarkedTasksMap.put(taskTracker, taskset);
|
|
|
}
|
|
|
taskset.add(taskid);
|
|
@@ -946,7 +935,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
|
|
|
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
|
|
|
markCompletedTaskAttempt(taskStatus.getTaskTracker(),
|
|
|
- taskStatus.getTaskId());
|
|
|
+ taskStatus.getTaskID());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -955,7 +944,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
|
|
|
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
|
|
|
markCompletedTaskAttempt(taskStatus.getTaskTracker(),
|
|
|
- taskStatus.getTaskId());
|
|
|
+ taskStatus.getTaskID());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -970,10 +959,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
*/
|
|
|
private void removeMarkedTasks(String taskTracker) {
|
|
|
// Purge all the 'marked' tasks which were running at taskTracker
|
|
|
- Set<String> markedTaskSet =
|
|
|
+ Set<TaskAttemptID> markedTaskSet =
|
|
|
trackerToMarkedTasksMap.get(taskTracker);
|
|
|
if (markedTaskSet != null) {
|
|
|
- for (String taskid : markedTaskSet) {
|
|
|
+ for (TaskAttemptID taskid : markedTaskSet) {
|
|
|
removeTaskEntry(taskid);
|
|
|
LOG.info("Removed completed task '" + taskid + "' from '" +
|
|
|
taskTracker + "'");
|
|
@@ -997,12 +986,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
synchronized private void removeJobTasks(JobInProgress job) {
|
|
|
for (TaskInProgress tip : job.getMapTasks()) {
|
|
|
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
|
|
|
- removeTaskEntry(taskStatus.getTaskId());
|
|
|
+ removeTaskEntry(taskStatus.getTaskID());
|
|
|
}
|
|
|
}
|
|
|
for (TaskInProgress tip : job.getReduceTasks()) {
|
|
|
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
|
|
|
- removeTaskEntry(taskStatus.getTaskId());
|
|
|
+ removeTaskEntry(taskStatus.getTaskID());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1068,12 +1057,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
removeJobTasks(rjob);
|
|
|
|
|
|
userJobs.remove(0);
|
|
|
- jobs.remove(rjob.getProfile().getJobId());
|
|
|
+ jobs.remove(rjob.getProfile().getJobID());
|
|
|
jobInitQueue.remove(rjob);
|
|
|
jobsByPriority.remove(rjob);
|
|
|
|
|
|
LOG.info("Retired job with id: '" +
|
|
|
- rjob.getProfile().getJobId() + "' of user: '" +
|
|
|
+ rjob.getProfile().getJobID() + "' of user: '" +
|
|
|
jobUser + "'");
|
|
|
} else {
|
|
|
// Do not remove jobs that aren't complete.
|
|
@@ -1137,7 +1126,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
*/
|
|
|
public synchronized List<JobInProgress> getRunningJobs() {
|
|
|
synchronized (jobs) {
|
|
|
- return (List<JobInProgress>) runningJobs();
|
|
|
+ return runningJobs();
|
|
|
}
|
|
|
}
|
|
|
public Vector<JobInProgress> failedJobs() {
|
|
@@ -1308,7 +1297,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
if (acceptNewTasks) {
|
|
|
Task task = getNewTaskForTaskTracker(trackerName);
|
|
|
if (task != null) {
|
|
|
- LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskId());
|
|
|
+ LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
|
|
|
actions.add(new LaunchTaskAction(task));
|
|
|
}
|
|
|
}
|
|
@@ -1458,6 +1447,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
} catch (InterruptedException ie) {}
|
|
|
}
|
|
|
}
|
|
|
+ @Override
|
|
|
public void run() {
|
|
|
while (!isInterrupted()) {
|
|
|
try {
|
|
@@ -1584,7 +1574,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
|
|
|
Task t = job.obtainNewMapTask(tts, numTaskTrackers);
|
|
|
if (t != null) {
|
|
|
- expireLaunchingTasks.addNewTask(t.getTaskId());
|
|
|
+ expireLaunchingTasks.addNewTask(t.getTaskID());
|
|
|
myMetrics.launchMap();
|
|
|
return t;
|
|
|
}
|
|
@@ -1621,7 +1611,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
|
|
|
Task t = job.obtainNewReduceTask(tts, numTaskTrackers);
|
|
|
if (t != null) {
|
|
|
- expireLaunchingTasks.addNewTask(t.getTaskId());
|
|
|
+ expireLaunchingTasks.addNewTask(t.getTaskID());
|
|
|
myMetrics.launchReduce();
|
|
|
return t;
|
|
|
}
|
|
@@ -1654,11 +1644,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
private synchronized List<TaskTrackerAction> getTasksToKill(
|
|
|
String taskTracker) {
|
|
|
|
|
|
- Set<String> taskIds = trackerToTaskMap.get(taskTracker);
|
|
|
+ Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
|
|
|
if (taskIds != null) {
|
|
|
List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
|
|
|
- Set<String> killJobIds = new TreeSet<String>();
|
|
|
- for (String killTaskId : taskIds) {
|
|
|
+ Set<JobID> killJobIds = new TreeSet<JobID>();
|
|
|
+ for (TaskAttemptID killTaskId : taskIds) {
|
|
|
TaskInProgress tip = taskidToTIPMap.get(killTaskId);
|
|
|
if (tip.shouldClose(killTaskId)) {
|
|
|
//
|
|
@@ -1670,13 +1660,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
killList.add(new KillTaskAction(killTaskId));
|
|
|
LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId);
|
|
|
} else {
|
|
|
- String killJobId = tip.getJob().getStatus().getJobId();
|
|
|
+ JobID killJobId = tip.getJob().getStatus().getJobID();
|
|
|
killJobIds.add(killJobId);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- for (String killJobId : killJobIds) {
|
|
|
+ for (JobID killJobId : killJobIds) {
|
|
|
killList.add(new KillJobAction(killJobId));
|
|
|
LOG.debug(taskTracker + " -> KillJobAction: " + killJobId);
|
|
|
}
|
|
@@ -1726,10 +1716,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
/**
|
|
|
* Allocates a new JobId string.
|
|
|
*/
|
|
|
- public synchronized String getNewJobId() throws IOException {
|
|
|
+ public synchronized JobID getNewJobId() throws IOException {
|
|
|
ensureRunning();
|
|
|
- return "job_" + getTrackerIdentifier() + "_" +
|
|
|
- idFormat.format(nextJobId++);
|
|
|
+ return new JobID(getTrackerIdentifier(), nextJobId++);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1744,7 +1733,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* asynchronously to handle split-computation and build up
|
|
|
* the right TaskTracker/Block mapping.
|
|
|
*/
|
|
|
- public synchronized JobStatus submitJob(String jobId) throws IOException {
|
|
|
+ public synchronized JobStatus submitJob(JobID jobId) throws IOException {
|
|
|
ensureRunning();
|
|
|
if(jobs.containsKey(jobId)) {
|
|
|
//job already running, don't start twice
|
|
@@ -1756,7 +1745,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
synchronized (jobs) {
|
|
|
synchronized (jobsByPriority) {
|
|
|
synchronized (jobInitQueue) {
|
|
|
- jobs.put(job.getProfile().getJobId(), job);
|
|
|
+ jobs.put(job.getProfile().getJobID(), job);
|
|
|
jobsByPriority.add(job);
|
|
|
jobInitQueue.add(job);
|
|
|
resortPriority();
|
|
@@ -1805,12 +1794,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public synchronized void killJob(String jobid) {
|
|
|
+ public synchronized void killJob(JobID jobid) {
|
|
|
JobInProgress job = jobs.get(jobid);
|
|
|
job.kill();
|
|
|
}
|
|
|
|
|
|
- public synchronized JobProfile getJobProfile(String jobid) {
|
|
|
+ public synchronized JobProfile getJobProfile(JobID jobid) {
|
|
|
JobInProgress job = jobs.get(jobid);
|
|
|
if (job != null) {
|
|
|
return job.getProfile();
|
|
@@ -1818,7 +1807,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
return completedJobStatusStore.readJobProfile(jobid);
|
|
|
}
|
|
|
}
|
|
|
- public synchronized JobStatus getJobStatus(String jobid) {
|
|
|
+ public synchronized JobStatus getJobStatus(JobID jobid) {
|
|
|
JobInProgress job = jobs.get(jobid);
|
|
|
if (job != null) {
|
|
|
return job.getStatus();
|
|
@@ -1826,7 +1815,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
return completedJobStatusStore.readJobStatus(jobid);
|
|
|
}
|
|
|
}
|
|
|
- public synchronized Counters getJobCounters(String jobid) {
|
|
|
+ public synchronized Counters getJobCounters(JobID jobid) {
|
|
|
JobInProgress job = jobs.get(jobid);
|
|
|
if (job != null) {
|
|
|
return job.getCounters();
|
|
@@ -1834,7 +1823,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
return completedJobStatusStore.readCounters(jobid);
|
|
|
}
|
|
|
}
|
|
|
- public synchronized TaskReport[] getMapTaskReports(String jobid) {
|
|
|
+ public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
|
|
|
JobInProgress job = jobs.get(jobid);
|
|
|
if (job == null) {
|
|
|
return new TaskReport[0];
|
|
@@ -1856,7 +1845,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public synchronized TaskReport[] getReduceTaskReports(String jobid) {
|
|
|
+ public synchronized TaskReport[] getReduceTaskReports(JobID jobid) {
|
|
|
JobInProgress job = jobs.get(jobid);
|
|
|
if (job == null) {
|
|
|
return new TaskReport[0];
|
|
@@ -1882,8 +1871,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int)
|
|
|
*/
|
|
|
public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
|
|
|
- String jobid, int fromEventId, int maxEvents) throws IOException{
|
|
|
+ JobID jobid, int fromEventId, int maxEvents) throws IOException{
|
|
|
TaskCompletionEvent[] events;
|
|
|
+
|
|
|
JobInProgress job = this.jobs.get(jobid);
|
|
|
if (null != job) {
|
|
|
events = job.getTaskCompletionEvents(fromEventId, maxEvents);
|
|
@@ -1896,15 +1886,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
|
|
|
/**
|
|
|
* Get the diagnostics for a given task
|
|
|
- * @param jobId the id of the job
|
|
|
- * @param tipId the id of the tip
|
|
|
* @param taskId the id of the task
|
|
|
* @return an array of the diagnostic messages
|
|
|
*/
|
|
|
- public synchronized String[] getTaskDiagnostics(String jobId,
|
|
|
- String tipId,
|
|
|
- String taskId)
|
|
|
- throws IOException {
|
|
|
+ public synchronized String[] getTaskDiagnostics(TaskAttemptID taskId)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ JobID jobId = taskId.getJobID();
|
|
|
+ TaskID tipId = taskId.getTaskID();
|
|
|
JobInProgress job = jobs.get(jobId);
|
|
|
if (job == null) {
|
|
|
throw new IllegalArgumentException("Job " + jobId + " not found.");
|
|
@@ -1919,15 +1908,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
|
|
|
/** Get all the TaskStatuses from the tipid. */
|
|
|
- TaskStatus[] getTaskStatuses(String jobid, String tipid) {
|
|
|
- TaskInProgress tip = getTip(jobid, tipid);
|
|
|
+ TaskStatus[] getTaskStatuses(TaskID tipid) {
|
|
|
+ TaskInProgress tip = getTip(tipid);
|
|
|
return (tip == null ? new TaskStatus[0]
|
|
|
: tip.getTaskStatuses());
|
|
|
}
|
|
|
|
|
|
/** Returns the TaskStatus for a particular taskid. */
|
|
|
- TaskStatus getTaskStatus(String jobid, String tipid, String taskid) {
|
|
|
- TaskInProgress tip = getTip(jobid, tipid);
|
|
|
+ TaskStatus getTaskStatus(TaskAttemptID taskid) {
|
|
|
+ TaskInProgress tip = getTip(taskid.getTaskID());
|
|
|
return (tip == null ? null
|
|
|
: tip.getTaskStatus(taskid));
|
|
|
}
|
|
@@ -1935,21 +1924,21 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
/**
|
|
|
* Returns the counters for the specified task in progress.
|
|
|
*/
|
|
|
- Counters getTipCounters(String jobid, String tipid) {
|
|
|
- TaskInProgress tip = getTip(jobid, tipid);
|
|
|
+ Counters getTipCounters(TaskID tipid) {
|
|
|
+ TaskInProgress tip = getTip(tipid);
|
|
|
return (tip == null ? null : tip.getCounters());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Returns specified TaskInProgress, or null.
|
|
|
*/
|
|
|
- private TaskInProgress getTip(String jobid, String tipid) {
|
|
|
- JobInProgress job = jobs.get(jobid);
|
|
|
+ private TaskInProgress getTip(TaskID tipid) {
|
|
|
+ JobInProgress job = jobs.get(tipid.getJobID());
|
|
|
return (job == null ? null : job.getTaskInProgress(tipid));
|
|
|
}
|
|
|
|
|
|
/** Mark a Task to be killed */
|
|
|
- public synchronized boolean killTask(String taskid, boolean shouldFail) throws IOException{
|
|
|
+ public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException{
|
|
|
TaskInProgress tip = taskidToTIPMap.get(taskid);
|
|
|
if(tip != null) {
|
|
|
return tip.killTask(taskid, shouldFail);
|
|
@@ -1965,7 +1954,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* @param taskId the name of the task
|
|
|
* @return The name of the task tracker
|
|
|
*/
|
|
|
- public synchronized String getAssignedTracker(String taskId) {
|
|
|
+ public synchronized String getAssignedTracker(TaskAttemptID taskId) {
|
|
|
return taskidToTrackerMap.get(taskId);
|
|
|
}
|
|
|
|
|
@@ -1999,7 +1988,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
///////////////////////////////////////////////////////////////
|
|
|
// JobTracker methods
|
|
|
///////////////////////////////////////////////////////////////
|
|
|
- public JobInProgress getJob(String jobid) {
|
|
|
+ public JobInProgress getJob(JobID jobid) {
|
|
|
return jobs.get(jobid);
|
|
|
}
|
|
|
|
|
@@ -2008,7 +1997,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* @param jobId job id
|
|
|
* @param priority new {@link JobPriority} for the job
|
|
|
*/
|
|
|
- synchronized void setJobPriority(String jobId, JobPriority priority) {
|
|
|
+ synchronized void setJobPriority(JobID jobId, JobPriority priority) {
|
|
|
JobInProgress job = jobs.get(jobId);
|
|
|
if (job != null) {
|
|
|
job.setPriority(priority);
|
|
@@ -2034,19 +2023,19 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
String trackerName = status.getTrackerName();
|
|
|
for (TaskStatus report : status.getTaskReports()) {
|
|
|
report.setTaskTracker(trackerName);
|
|
|
- String taskId = report.getTaskId();
|
|
|
+ TaskAttemptID taskId = report.getTaskID();
|
|
|
TaskInProgress tip = taskidToTIPMap.get(taskId);
|
|
|
if (tip == null) {
|
|
|
- LOG.info("Serious problem. While updating status, cannot find taskid " + report.getTaskId());
|
|
|
+ LOG.info("Serious problem. While updating status, cannot find taskid " + report.getTaskID());
|
|
|
} else {
|
|
|
expireLaunchingTasks.removeTask(taskId);
|
|
|
tip.getJob().updateTaskStatus(tip, report, myMetrics);
|
|
|
}
|
|
|
|
|
|
// Process 'failed fetch' notifications
|
|
|
- List<String> failedFetchMaps = report.getFetchFailedMaps();
|
|
|
+ List<TaskAttemptID> failedFetchMaps = report.getFetchFailedMaps();
|
|
|
if (failedFetchMaps != null) {
|
|
|
- for (String mapTaskId : failedFetchMaps) {
|
|
|
+ for (TaskAttemptID mapTaskId : failedFetchMaps) {
|
|
|
TaskInProgress failedFetchMap = taskidToTIPMap.get(mapTaskId);
|
|
|
|
|
|
if (failedFetchMap != null) {
|
|
@@ -2072,13 +2061,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
*/
|
|
|
void lostTaskTracker(String trackerName) {
|
|
|
LOG.info("Lost tracker '" + trackerName + "'");
|
|
|
- Set<String> lostTasks = trackerToTaskMap.get(trackerName);
|
|
|
+ Set<TaskAttemptID> lostTasks = trackerToTaskMap.get(trackerName);
|
|
|
trackerToTaskMap.remove(trackerName);
|
|
|
|
|
|
if (lostTasks != null) {
|
|
|
// List of jobs which had any of their tasks fail on this tracker
|
|
|
Set<JobInProgress> jobsWithFailures = new HashSet<JobInProgress>();
|
|
|
- for (String taskId : lostTasks) {
|
|
|
+ for (TaskAttemptID taskId : lostTasks) {
|
|
|
TaskInProgress tip = taskidToTIPMap.get(taskId);
|
|
|
|
|
|
// Completed reduce tasks never need to be failed, because
|
|
@@ -2147,6 +2136,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
public void run() {
|
|
|
int batchCommitSize = conf.getInt("jobtracker.task.commit.batch.size",
|
|
|
5000);
|
|
@@ -2160,7 +2150,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
|
|
|
JobInProgress[] jobs = new JobInProgress[jobList.size()];
|
|
|
TaskInProgress[] tips = new TaskInProgress[jobList.size()];
|
|
|
- String[] taskids = new String[jobList.size()];
|
|
|
+ TaskAttemptID[] taskids = new TaskAttemptID[jobList.size()];
|
|
|
JobTrackerMetrics[] metrics = new JobTrackerMetrics[jobList.size()];
|
|
|
|
|
|
Iterator<JobInProgress.JobWithTaskContext> iter = jobList.iterator();
|
|
@@ -2170,7 +2160,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
JobInProgress.JobWithTaskContext j = iter.next();
|
|
|
jobs[count] = j.getJob();
|
|
|
tips[count] = j.getTIP();
|
|
|
- taskids[count]= j.getTaskId();
|
|
|
+ taskids[count]= j.getTaskID();
|
|
|
metrics[count] = j.getJobTrackerMetrics();
|
|
|
++count;
|
|
|
}
|
|
@@ -2185,7 +2175,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
synchronized (jobs[i]) {
|
|
|
synchronized (tips[i]) {
|
|
|
status[i] = tips[i].getTaskStatus(taskids[i]);
|
|
|
- tasks[i] = tips[i].getTaskObject(taskids[i]);
|
|
|
+ tasks[i] = tips[i].getTask(taskids[i]);
|
|
|
states[i] = status[i].getRunState();
|
|
|
isTipComplete[i] = tips[i].isComplete();
|
|
|
}
|
|
@@ -2203,7 +2193,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
//is detected so that the JT can immediately schedule another
|
|
|
//attempt for that task.
|
|
|
|
|
|
- Set<String> seenTIPs = new HashSet<String>();
|
|
|
+ Set<TaskID> seenTIPs = new HashSet<TaskID>();
|
|
|
for(int index = 0; index < jobList.size(); ++index) {
|
|
|
try {
|
|
|
if (states[index] == TaskStatus.State.COMMIT_PENDING) {
|
|
@@ -2227,12 +2217,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
TaskStatus.Phase phase = (tips[index].isMapTask()
|
|
|
? TaskStatus.Phase.MAP
|
|
|
: TaskStatus.Phase.REDUCE);
|
|
|
- jobs[index].failedTask(tips[index], status[index].getTaskId(),
|
|
|
+ jobs[index].failedTask(tips[index], status[index].getTaskID(),
|
|
|
reason, phase, TaskStatus.State.FAILED,
|
|
|
status[index].getTaskTracker(), null);
|
|
|
}
|
|
|
LOG.info("Failed to rename the output of "
|
|
|
- + status[index].getTaskId() + " with "
|
|
|
+ + status[index].getTaskID() + " with "
|
|
|
+ StringUtils.stringifyException(ioe));
|
|
|
}
|
|
|
}
|
|
@@ -2255,7 +2245,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
states[i] = TaskStatus.State.SUCCEEDED;
|
|
|
}
|
|
|
} else {
|
|
|
- tips[i].addDiagnosticInfo(tasks[i].getTaskId(),
|
|
|
+ tips[i].addDiagnosticInfo(tasks[i].getTaskID(),
|
|
|
"Already completed TIP");
|
|
|
states[i] = TaskStatus.State.KILLED;
|
|
|
}
|
|
@@ -2296,7 +2286,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* @param jobId id of the job
|
|
|
* @return the path of the job conf file on the local file system
|
|
|
*/
|
|
|
- public static String getLocalJobFilePath(String jobId){
|
|
|
+ public static String getLocalJobFilePath(JobID jobId){
|
|
|
return JobHistory.JobInfo.getLocalJobFilePath(jobId);
|
|
|
}
|
|
|
////////////////////////////////////////////////////////////
|