|
@@ -50,7 +50,7 @@ public class TaskTracker
|
|
private long taskTimeout;
|
|
private long taskTimeout;
|
|
private int httpPort;
|
|
private int httpPort;
|
|
|
|
|
|
- static final int STALE_STATE = 1;
|
|
|
|
|
|
+ static enum State {NORMAL, STALE, INTERRUPTED}
|
|
|
|
|
|
public static final Log LOG =
|
|
public static final Log LOG =
|
|
LogFactory.getLog("org.apache.hadoop.mapred.TaskTracker");
|
|
LogFactory.getLog("org.apache.hadoop.mapred.TaskTracker");
|
|
@@ -71,12 +71,12 @@ public class TaskTracker
|
|
|
|
|
|
boolean shuttingDown = false;
|
|
boolean shuttingDown = false;
|
|
|
|
|
|
- TreeMap tasks = null;
|
|
|
|
|
|
+ Map<String, TaskInProgress> tasks = null;
|
|
/**
|
|
/**
|
|
* Map from taskId -> TaskInProgress.
|
|
* Map from taskId -> TaskInProgress.
|
|
*/
|
|
*/
|
|
- TreeMap runningTasks = null;
|
|
|
|
- Map runningJobs = null;
|
|
|
|
|
|
+ Map<String, TaskInProgress> runningTasks = null;
|
|
|
|
+ Map<String, RunningJob> runningJobs = null;
|
|
int mapTotal = 0;
|
|
int mapTotal = 0;
|
|
int reduceTotal = 0;
|
|
int reduceTotal = 0;
|
|
boolean justStarted = true;
|
|
boolean justStarted = true;
|
|
@@ -151,6 +151,41 @@ public class TaskTracker
|
|
taskCleanupThread.start();
|
|
taskCleanupThread.start();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private RunningJob addTaskToJob(String jobId,
|
|
|
|
+ Path localJobFile,
|
|
|
|
+ TaskInProgress tip) {
|
|
|
|
+ synchronized (runningJobs) {
|
|
|
|
+ RunningJob rJob = null;
|
|
|
|
+ if (!runningJobs.containsKey(jobId)) {
|
|
|
|
+ rJob = new RunningJob(localJobFile);
|
|
|
|
+ rJob.localized = false;
|
|
|
|
+ rJob.tasks = new HashSet();
|
|
|
|
+ rJob.jobFile = localJobFile;
|
|
|
|
+ runningJobs.put(jobId, rJob);
|
|
|
|
+ } else {
|
|
|
|
+ rJob = runningJobs.get(jobId);
|
|
|
|
+ }
|
|
|
|
+ rJob.tasks.add(tip);
|
|
|
|
+ return rJob;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void removeTaskFromJob(String jobId, TaskInProgress tip) {
|
|
|
|
+ synchronized (runningJobs) {
|
|
|
|
+ RunningJob rjob = runningJobs.get(jobId);
|
|
|
|
+ if (rjob == null) {
|
|
|
|
+ LOG.warn("Unknown job " + jobId + " being deleted.");
|
|
|
|
+ } else {
|
|
|
|
+ synchronized (rjob) {
|
|
|
|
+ rjob.tasks.remove(tip);
|
|
|
|
+ if (rjob.tasks.isEmpty()) {
|
|
|
|
+ runningJobs.remove(jobId);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
static String getCacheSubdir() {
|
|
static String getCacheSubdir() {
|
|
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
|
|
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
|
|
}
|
|
}
|
|
@@ -229,30 +264,14 @@ public class TaskTracker
|
|
private void localizeJob(TaskInProgress tip) throws IOException {
|
|
private void localizeJob(TaskInProgress tip) throws IOException {
|
|
Path localJarFile = null;
|
|
Path localJarFile = null;
|
|
Task t = tip.getTask();
|
|
Task t = tip.getTask();
|
|
- Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t
|
|
|
|
- .getJobId()
|
|
|
|
- + Path.SEPARATOR + "job.xml"));
|
|
|
|
- RunningJob rjob = null;
|
|
|
|
- synchronized (runningJobs) {
|
|
|
|
- if (!runningJobs.containsKey(t.getJobId())) {
|
|
|
|
- rjob = new RunningJob();
|
|
|
|
- rjob.localized = false;
|
|
|
|
- rjob.tasks = new ArrayList();
|
|
|
|
- rjob.jobFile = localJobFile;
|
|
|
|
- rjob.tasks.add(tip);
|
|
|
|
- runningJobs.put(t.getJobId(), rjob);
|
|
|
|
- } else {
|
|
|
|
- rjob = (RunningJob) runningJobs.get(t.getJobId());
|
|
|
|
- // keep this for later use when we just get a jobid to delete
|
|
|
|
- // the data for
|
|
|
|
- rjob.tasks.add(tip);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ String jobId = t.getJobId();
|
|
|
|
+ Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()),
|
|
|
|
+ jobId + Path.SEPARATOR + "job.xml");
|
|
|
|
+ RunningJob rjob = addTaskToJob(jobId, localJobFile, tip);
|
|
synchronized (rjob) {
|
|
synchronized (rjob) {
|
|
if (!rjob.localized) {
|
|
if (!rjob.localized) {
|
|
- localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t
|
|
|
|
- .getJobId())
|
|
|
|
- + Path.SEPARATOR + "job.jar");
|
|
|
|
|
|
+ localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()),
|
|
|
|
+ jobId + Path.SEPARATOR + "job.jar");
|
|
|
|
|
|
String jobFile = t.getJobFile();
|
|
String jobFile = t.getJobFile();
|
|
fs.copyToLocalFile(new Path(jobFile), localJobFile);
|
|
fs.copyToLocalFile(new Path(jobFile), localJobFile);
|
|
@@ -385,28 +404,7 @@ public class TaskTracker
|
|
public InterTrackerProtocol getJobClient() {
|
|
public InterTrackerProtocol getJobClient() {
|
|
return jobClient;
|
|
return jobClient;
|
|
}
|
|
}
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Are we running under killall-less operating system.
|
|
|
|
- */
|
|
|
|
- private static boolean isWindows =
|
|
|
|
- System.getProperty("os.name").startsWith("Windows");
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Get the call stacks for all java processes on this system.
|
|
|
|
- * Obviously, this is only useful for debugging.
|
|
|
|
- */
|
|
|
|
- private static void getCallStacks() {
|
|
|
|
- if (LOG.isDebugEnabled() && !isWindows) {
|
|
|
|
- try {
|
|
|
|
- Process proc =
|
|
|
|
- Runtime.getRuntime().exec("killall -QUIT java");
|
|
|
|
- proc.waitFor();
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- LOG.warn(StringUtils.stringifyException(ie));
|
|
|
|
- } catch (InterruptedException ie) {}
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
/**Return the DFS filesystem
|
|
/**Return the DFS filesystem
|
|
* @return
|
|
* @return
|
|
*/
|
|
*/
|
|
@@ -417,220 +415,227 @@ public class TaskTracker
|
|
/**
|
|
/**
|
|
* Main service loop. Will stay in this loop forever.
|
|
* Main service loop. Will stay in this loop forever.
|
|
*/
|
|
*/
|
|
- int offerService() throws Exception {
|
|
|
|
|
|
+ State offerService() throws Exception {
|
|
long lastHeartbeat = 0;
|
|
long lastHeartbeat = 0;
|
|
this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
|
|
this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
|
|
|
|
|
|
while (running && !shuttingDown) {
|
|
while (running && !shuttingDown) {
|
|
|
|
+ try {
|
|
long now = System.currentTimeMillis();
|
|
long now = System.currentTimeMillis();
|
|
|
|
|
|
long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);
|
|
long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);
|
|
if (waitTime > 0) {
|
|
if (waitTime > 0) {
|
|
- try {
|
|
|
|
- // sleeps for the wait time, wakes up if a task is finished.
|
|
|
|
- synchronized(finishedCount) {
|
|
|
|
- if (finishedCount[0] == 0) {
|
|
|
|
- finishedCount.wait(waitTime);
|
|
|
|
- }
|
|
|
|
- finishedCount[0] = 0;
|
|
|
|
- }
|
|
|
|
- } catch (InterruptedException ie) {
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- lastHeartbeat = now;
|
|
|
|
-
|
|
|
|
- //
|
|
|
|
- // Emit standard hearbeat message to check in with JobTracker
|
|
|
|
- //
|
|
|
|
- Vector taskReports = new Vector();
|
|
|
|
- synchronized (this) {
|
|
|
|
- for (Iterator it = runningTasks.values().iterator();
|
|
|
|
- it.hasNext(); ) {
|
|
|
|
- TaskInProgress tip = (TaskInProgress) it.next();
|
|
|
|
- TaskStatus status = tip.createStatus();
|
|
|
|
- taskReports.add(status);
|
|
|
|
|
|
+ // sleeps for the wait time, wakes up if a task is finished.
|
|
|
|
+ synchronized(finishedCount) {
|
|
|
|
+ if (finishedCount[0] == 0) {
|
|
|
|
+ finishedCount.wait(waitTime);
|
|
}
|
|
}
|
|
- }
|
|
|
|
-
|
|
|
|
- //
|
|
|
|
- // Xmit the heartbeat
|
|
|
|
- //
|
|
|
|
-
|
|
|
|
- TaskTrackerStatus status =
|
|
|
|
- new TaskTrackerStatus(taskTrackerName, localHostname,
|
|
|
|
- httpPort, taskReports,
|
|
|
|
- failures);
|
|
|
|
- int resultCode = jobClient.emitHeartbeat(status, justStarted);
|
|
|
|
- synchronized (this) {
|
|
|
|
- for (Iterator it = taskReports.iterator();
|
|
|
|
- it.hasNext(); ) {
|
|
|
|
- TaskStatus taskStatus = (TaskStatus) it.next();
|
|
|
|
- if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
|
|
|
|
- if (taskStatus.getIsMap()) {
|
|
|
|
- mapTotal--;
|
|
|
|
- } else {
|
|
|
|
- reduceTotal--;
|
|
|
|
- }
|
|
|
|
- myMetrics.completeTask();
|
|
|
|
- runningTasks.remove(taskStatus.getTaskId());
|
|
|
|
- }
|
|
|
|
|
|
+ finishedCount[0] = 0;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- justStarted = false;
|
|
|
|
-
|
|
|
|
- if (resultCode == InterTrackerProtocol.UNKNOWN_TASKTRACKER) {
|
|
|
|
- return STALE_STATE;
|
|
|
|
- }
|
|
|
|
|
|
|
|
- //
|
|
|
|
- // Check if we should createRecord a new Task
|
|
|
|
- //
|
|
|
|
- try {
|
|
|
|
- if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) && acceptNewTasks) {
|
|
|
|
- checkLocalDirs(fConf.getLocalDirs());
|
|
|
|
-
|
|
|
|
- if (enoughFreeSpace(minSpaceStart)) {
|
|
|
|
- Task t = jobClient.pollForNewTask(taskTrackerName);
|
|
|
|
- if (t != null) {
|
|
|
|
- startNewTask(t);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- } catch (DiskErrorException de ) {
|
|
|
|
- LOG.warn("Exiting task tracker because "+de.getMessage());
|
|
|
|
- jobClient.reportTaskTrackerError(taskTrackerName,
|
|
|
|
- "DiskErrorException", de.getMessage());
|
|
|
|
- return STALE_STATE;
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- LOG.info("Problem launching task: " +
|
|
|
|
- StringUtils.stringifyException(ie));
|
|
|
|
|
|
+ if (!transmitHeartBeat()) {
|
|
|
|
+ return State.STALE;
|
|
}
|
|
}
|
|
|
|
+ lastHeartbeat = now;
|
|
|
|
+ justStarted = false;
|
|
|
|
|
|
- //
|
|
|
|
- // Kill any tasks that have not reported progress in the last X seconds.
|
|
|
|
- //
|
|
|
|
- synchronized (this) {
|
|
|
|
- for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
|
|
|
|
- TaskInProgress tip = (TaskInProgress) it.next();
|
|
|
|
- long timeSinceLastReport = System.currentTimeMillis() -
|
|
|
|
- tip.getLastProgressReport();
|
|
|
|
- if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
|
|
|
|
- (timeSinceLastReport > this.taskTimeout) &&
|
|
|
|
- !tip.wasKilled) {
|
|
|
|
- String msg = "Task failed to report status for " +
|
|
|
|
- (timeSinceLastReport / 1000) +
|
|
|
|
- " seconds. Killing.";
|
|
|
|
- LOG.info(tip.getTask().getTaskId() + ": " + msg);
|
|
|
|
- getCallStacks();
|
|
|
|
- tip.reportDiagnosticInfo(msg);
|
|
|
|
- try {
|
|
|
|
- tip.killAndCleanup(true);
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- LOG.info("Problem cleaning task up: " +
|
|
|
|
- StringUtils.stringifyException(ie));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ checkForNewTasks();
|
|
|
|
+ markUnresponsiveTasks();
|
|
|
|
+ closeCompletedTasks();
|
|
|
|
+ killOverflowingTasks();
|
|
|
|
+
|
|
|
|
+ //we've cleaned up, resume normal operation
|
|
|
|
+ if (!acceptNewTasks && tasks.isEmpty()) {
|
|
|
|
+ acceptNewTasks=true;
|
|
}
|
|
}
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ LOG.info("Interrupted. Closing down.");
|
|
|
|
+ return State.INTERRUPTED;
|
|
|
|
+ } catch (DiskErrorException de) {
|
|
|
|
+ String msg = "Exiting task tracker for disk error:\n" +
|
|
|
|
+ StringUtils.stringifyException(de);
|
|
|
|
+ LOG.error(msg);
|
|
|
|
+ jobClient.reportTaskTrackerError(taskTrackerName,
|
|
|
|
+ "DiskErrorException", msg);
|
|
|
|
+ return State.STALE;
|
|
|
|
+ } catch (Exception except) {
|
|
|
|
+ String msg = "Caught exception: " +
|
|
|
|
+ StringUtils.stringifyException(except);
|
|
|
|
+ LOG.error(msg);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- //
|
|
|
|
- // Check for any Tasks that should be killed, even if
|
|
|
|
- // the containing Job is still ongoing. (This happens
|
|
|
|
- // with speculative execution, when one version of the
|
|
|
|
- // task finished before another
|
|
|
|
- //
|
|
|
|
|
|
+ return State.NORMAL;
|
|
|
|
+ }
|
|
|
|
|
|
- //
|
|
|
|
- // Check for any Tasks whose job may have ended
|
|
|
|
- //
|
|
|
|
- try {
|
|
|
|
- String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName);
|
|
|
|
- if (toCloseIds != null) {
|
|
|
|
- synchronized (this) {
|
|
|
|
- for (int i = 0; i < toCloseIds.length; i++) {
|
|
|
|
- Object tip = tasks.get(toCloseIds[i]);
|
|
|
|
- synchronized(runningJobs){
|
|
|
|
- runningJobs.remove(((TaskInProgress)
|
|
|
|
- tasks.get(toCloseIds[i])).getTask().getJobId());
|
|
|
|
- }
|
|
|
|
- if (tip != null) {
|
|
|
|
- tasksToCleanup.put(tip);
|
|
|
|
- } else {
|
|
|
|
- LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);
|
|
|
|
- }
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Build and transmit the heart beat to the JobTracker
|
|
|
|
+ * @return false if the tracker was unknown
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ private boolean transmitHeartBeat() throws IOException {
|
|
|
|
+ //
|
|
|
|
+ // Build the heartbeat information for the JobTracker
|
|
|
|
+ //
|
|
|
|
+ List<TaskStatus> taskReports = new ArrayList(runningTasks.size());
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ for (TaskInProgress tip: runningTasks.values()) {
|
|
|
|
+ taskReports.add(tip.createStatus());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ TaskTrackerStatus status =
|
|
|
|
+ new TaskTrackerStatus(taskTrackerName, localHostname,
|
|
|
|
+ httpPort, taskReports,
|
|
|
|
+ failures);
|
|
|
|
+
|
|
|
|
+ //
|
|
|
|
+ // Xmit the heartbeat
|
|
|
|
+ //
|
|
|
|
+
|
|
|
|
+ int resultCode = jobClient.emitHeartbeat(status, justStarted);
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ for (TaskStatus taskStatus: taskReports) {
|
|
|
|
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
|
|
|
|
+ if (taskStatus.getIsMap()) {
|
|
|
|
+ mapTotal--;
|
|
|
|
+ } else {
|
|
|
|
+ reduceTotal--;
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
+ myMetrics.completeTask();
|
|
|
|
+ runningTasks.remove(taskStatus.getTaskId());
|
|
}
|
|
}
|
|
- } catch (IOException ie) {
|
|
|
|
- LOG.info("Problem getting closed tasks: " +
|
|
|
|
- StringUtils.stringifyException(ie));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- //Check if we're dangerously low on disk space
|
|
|
|
- // If so, kill jobs to free up space and make sure
|
|
|
|
- // we don't accept any new tasks
|
|
|
|
- // Try killing the reduce jobs first, since I believe they
|
|
|
|
- // use up most space
|
|
|
|
- // Then pick the one with least progress
|
|
|
|
-
|
|
|
|
- if (!enoughFreeSpace(minSpaceKill)) {
|
|
|
|
- acceptNewTasks=false;
|
|
|
|
- //we give up! do not accept new tasks until
|
|
|
|
- //all the ones running have finished and they're all cleared up
|
|
|
|
- synchronized (this) {
|
|
|
|
- TaskInProgress killMe = null;
|
|
|
|
-
|
|
|
|
- for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
|
|
|
|
- TaskInProgress tip = (TaskInProgress) it.next();
|
|
|
|
- if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
|
|
|
|
- !tip.wasKilled) {
|
|
|
|
-
|
|
|
|
- if (killMe == null) {
|
|
|
|
- killMe = tip;
|
|
|
|
-
|
|
|
|
- } else if (!tip.getTask().isMapTask()) {
|
|
|
|
- //reduce task, give priority
|
|
|
|
- if (killMe.getTask().isMapTask() ||
|
|
|
|
- (tip.getTask().getProgress().get() <
|
|
|
|
- killMe.getTask().getProgress().get())) {
|
|
|
|
-
|
|
|
|
- killMe = tip;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- } else if (killMe.getTask().isMapTask() &&
|
|
|
|
- tip.getTask().getProgress().get() <
|
|
|
|
- killMe.getTask().getProgress().get()) {
|
|
|
|
- //map task, only add if the progress is lower
|
|
|
|
-
|
|
|
|
- killMe = tip;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return resultCode != InterTrackerProtocol.UNKNOWN_TASKTRACKER;
|
|
|
|
+ }
|
|
|
|
|
|
- if (killMe!=null) {
|
|
|
|
- String msg = "Tasktracker running out of space. Killing task.";
|
|
|
|
- LOG.info(killMe.getTask().getTaskId() + ": " + msg);
|
|
|
|
- killMe.reportDiagnosticInfo(msg);
|
|
|
|
- try {
|
|
|
|
- killMe.killAndCleanup(true);
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- LOG.info("Problem cleaning task up: " +
|
|
|
|
- StringUtils.stringifyException(ie));
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Check to see if there are any new tasks that we should run.
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ private void checkForNewTasks() throws IOException {
|
|
|
|
+ //
|
|
|
|
+ // Check if we should ask for a new Task
|
|
|
|
+ //
|
|
|
|
+ if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
|
|
|
|
+ acceptNewTasks) {
|
|
|
|
+ checkLocalDirs(fConf.getLocalDirs());
|
|
|
|
+
|
|
|
|
+ if (enoughFreeSpace(minSpaceStart)) {
|
|
|
|
+ Task t = jobClient.pollForNewTask(taskTrackerName);
|
|
|
|
+ if (t != null) {
|
|
|
|
+ startNewTask(t);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Kill any tasks that have not reported progress in the last X seconds.
|
|
|
|
+ */
|
|
|
|
+ private synchronized void markUnresponsiveTasks() throws IOException {
|
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
|
+ for (TaskInProgress tip: runningTasks.values()) {
|
|
|
|
+ long timeSinceLastReport = now - tip.getLastProgressReport();
|
|
|
|
+ if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
|
|
|
|
+ (timeSinceLastReport > this.taskTimeout) &&
|
|
|
|
+ !tip.wasKilled) {
|
|
|
|
+ String msg = "Task failed to report status for " +
|
|
|
|
+ (timeSinceLastReport / 1000) +
|
|
|
|
+ " seconds. Killing.";
|
|
|
|
+ LOG.info(tip.getTask().getTaskId() + ": " + msg);
|
|
|
|
+ ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
|
|
|
|
+ tip.reportDiagnosticInfo(msg);
|
|
|
|
+ tasksToCleanup.put(tip);
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
-
|
|
|
|
- //we've cleaned up, resume normal operation
|
|
|
|
- if (!acceptNewTasks && tasks.isEmpty()) {
|
|
|
|
- acceptNewTasks=true;
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Ask the JobTracker if there are any tasks that we should clean up,
|
|
|
|
+ * either because we don't need them any more or because the job is done.
|
|
|
|
+ */
|
|
|
|
+ private void closeCompletedTasks() throws IOException {
|
|
|
|
+ String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName);
|
|
|
|
+ if (toCloseIds != null) {
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ for (int i = 0; i < toCloseIds.length; i++) {
|
|
|
|
+ TaskInProgress tip = tasks.get(toCloseIds[i]);
|
|
|
|
+ if (tip != null) {
|
|
|
|
+ // remove the task from running jobs, removing the job if
|
|
|
|
+ // it is the last task
|
|
|
|
+ removeTaskFromJob(tip.getTask().getJobId(), tip);
|
|
|
|
+ tasksToCleanup.put(tip);
|
|
|
|
+ } else {
|
|
|
|
+ LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);
|
|
}
|
|
}
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- return 0;
|
|
|
|
|
|
+ /** Check if we're dangerously low on disk space
|
|
|
|
+ * If so, kill jobs to free up space and make sure
|
|
|
|
+ * we don't accept any new tasks
|
|
|
|
+ * Try killing the reduce jobs first, since I believe they
|
|
|
|
+ * use up most space
|
|
|
|
+ * Then pick the one with least progress
|
|
|
|
+ */
|
|
|
|
+ private void killOverflowingTasks() throws IOException {
|
|
|
|
+ if (!enoughFreeSpace(minSpaceKill)) {
|
|
|
|
+ acceptNewTasks=false;
|
|
|
|
+ //we give up! do not accept new tasks until
|
|
|
|
+ //all the ones running have finished and they're all cleared up
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ TaskInProgress killMe = findTaskToKill();
|
|
|
|
+
|
|
|
|
+ if (killMe!=null) {
|
|
|
|
+ String msg = "Tasktracker running out of space." +
|
|
|
|
+ " Killing task.";
|
|
|
|
+ LOG.info(killMe.getTask().getTaskId() + ": " + msg);
|
|
|
|
+ killMe.reportDiagnosticInfo(msg);
|
|
|
|
+ tasksToCleanup.put(killMe);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Pick a task to kill to free up space
|
|
|
|
+ * @return the task to kill or null, if one wasn't found
|
|
|
|
+ */
|
|
|
|
+ private TaskInProgress findTaskToKill() {
|
|
|
|
+ TaskInProgress killMe = null;
|
|
|
|
+ for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
|
|
|
|
+ TaskInProgress tip = (TaskInProgress) it.next();
|
|
|
|
+ if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
|
|
|
|
+ !tip.wasKilled) {
|
|
|
|
+
|
|
|
|
+ if (killMe == null) {
|
|
|
|
+ killMe = tip;
|
|
|
|
+
|
|
|
|
+ } else if (!tip.getTask().isMapTask()) {
|
|
|
|
+ //reduce task, give priority
|
|
|
|
+ if (killMe.getTask().isMapTask() ||
|
|
|
|
+ (tip.getTask().getProgress().get() <
|
|
|
|
+ killMe.getTask().getProgress().get())) {
|
|
|
|
+
|
|
|
|
+ killMe = tip;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ } else if (killMe.getTask().isMapTask() &&
|
|
|
|
+ tip.getTask().getProgress().get() <
|
|
|
|
+ killMe.getTask().getProgress().get()) {
|
|
|
|
+ //map task, only add if the progress is lower
|
|
|
|
|
|
|
|
+ killMe = tip;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return killMe;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Check if all of the local directories have enough
|
|
* Check if all of the local directories have enough
|
|
* free space
|
|
* free space
|
|
@@ -640,6 +645,9 @@ public class TaskTracker
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
private boolean enoughFreeSpace(long minSpace) throws IOException {
|
|
private boolean enoughFreeSpace(long minSpace) throws IOException {
|
|
|
|
+ if (minSpace == 0) {
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
String[] localDirs = fConf.getLocalDirs();
|
|
String[] localDirs = fConf.getLocalDirs();
|
|
for (int i = 0; i < localDirs.length; i++) {
|
|
for (int i = 0; i < localDirs.length; i++) {
|
|
DF df = null;
|
|
DF df = null;
|
|
@@ -704,7 +712,7 @@ public class TaskTracker
|
|
// This while-loop attempts reconnects if we get network errors
|
|
// This while-loop attempts reconnects if we get network errors
|
|
while (running && ! staleState && !shuttingDown ) {
|
|
while (running && ! staleState && !shuttingDown ) {
|
|
try {
|
|
try {
|
|
- if (offerService() == STALE_STATE) {
|
|
|
|
|
|
+ if (offerService() == State.STALE) {
|
|
staleState = true;
|
|
staleState = true;
|
|
}
|
|
}
|
|
} catch (Exception ex) {
|
|
} catch (Exception ex) {
|
|
@@ -722,11 +730,12 @@ public class TaskTracker
|
|
close();
|
|
close();
|
|
}
|
|
}
|
|
if (shuttingDown) { return; }
|
|
if (shuttingDown) { return; }
|
|
- LOG.info("Reinitializing local state");
|
|
|
|
|
|
+ LOG.warn("Reinitializing local state");
|
|
initialize();
|
|
initialize();
|
|
}
|
|
}
|
|
} catch (IOException iex) {
|
|
} catch (IOException iex) {
|
|
- LOG.info("Got fatal exception while reinitializing TaskTracker: " + iex.toString());
|
|
|
|
|
|
+ LOG.error("Got fatal exception while reinitializing TaskTracker: " +
|
|
|
|
+ StringUtils.stringifyException(iex));
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -811,7 +820,8 @@ public class TaskTracker
|
|
progress, runstate,
|
|
progress, runstate,
|
|
diagnosticInfo.toString(),
|
|
diagnosticInfo.toString(),
|
|
"initializing",
|
|
"initializing",
|
|
- getName(), task.isMapTask()?Phase.MAP:Phase.SHUFFLE);
|
|
|
|
|
|
+ getName(), task.isMapTask()? TaskStatus.Phase.MAP:
|
|
|
|
+ TaskStatus.Phase.SHUFFLE);
|
|
keepJobFiles = false;
|
|
keepJobFiles = false;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -884,17 +894,18 @@ public class TaskTracker
|
|
/**
|
|
/**
|
|
* The task is reporting its progress
|
|
* The task is reporting its progress
|
|
*/
|
|
*/
|
|
- public synchronized void reportProgress(float p, String state, Phase newPhase) {
|
|
|
|
|
|
+ public synchronized void reportProgress(float p, String state,
|
|
|
|
+ TaskStatus.Phase newPhase) {
|
|
LOG.info(task.getTaskId()+" "+p+"% "+state);
|
|
LOG.info(task.getTaskId()+" "+p+"% "+state);
|
|
this.progress = p;
|
|
this.progress = p;
|
|
this.runstate = TaskStatus.State.RUNNING;
|
|
this.runstate = TaskStatus.State.RUNNING;
|
|
this.lastProgressReport = System.currentTimeMillis();
|
|
this.lastProgressReport = System.currentTimeMillis();
|
|
- Phase oldPhase = taskStatus.getPhase() ;
|
|
|
|
|
|
+ TaskStatus.Phase oldPhase = taskStatus.getPhase() ;
|
|
if( oldPhase != newPhase ){
|
|
if( oldPhase != newPhase ){
|
|
// sort phase started
|
|
// sort phase started
|
|
- if( newPhase == Phase.SORT ){
|
|
|
|
|
|
+ if( newPhase == TaskStatus.Phase.SORT ){
|
|
this.taskStatus.setShuffleFinishTime(System.currentTimeMillis());
|
|
this.taskStatus.setShuffleFinishTime(System.currentTimeMillis());
|
|
- }else if( newPhase == Phase.REDUCE){
|
|
|
|
|
|
+ }else if( newPhase == TaskStatus.Phase.REDUCE){
|
|
this.taskStatus.setSortFinishTime(System.currentTimeMillis());
|
|
this.taskStatus.setSortFinishTime(System.currentTimeMillis());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1068,6 +1079,16 @@ public class TaskTracker
|
|
JOBCACHE + Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
|
|
JOBCACHE + Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
|
|
taskId);
|
|
taskId);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ public boolean equals(Object obj) {
|
|
|
|
+ return (obj instanceof TaskInProgress) &&
|
|
|
|
+ task.getTaskId().equals
|
|
|
|
+ (((TaskInProgress) obj).getTask().getTaskId());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public int hashCode() {
|
|
|
|
+ return task.getTaskId().hashCode();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1089,7 +1110,10 @@ public class TaskTracker
|
|
/**
|
|
/**
|
|
* Called periodically to report Task progress, from 0.0 to 1.0.
|
|
* Called periodically to report Task progress, from 0.0 to 1.0.
|
|
*/
|
|
*/
|
|
- public synchronized void progress(String taskid, float progress, String state, Phase phase) throws IOException {
|
|
|
|
|
|
+ public synchronized void progress(String taskid, float progress,
|
|
|
|
+ String state,
|
|
|
|
+ TaskStatus.Phase phase
|
|
|
|
+ ) throws IOException {
|
|
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
|
|
TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
|
|
if (tip != null) {
|
|
if (tip != null) {
|
|
tip.reportProgress(progress, state, phase);
|
|
tip.reportProgress(progress, state, phase);
|
|
@@ -1150,7 +1174,7 @@ public class TaskTracker
|
|
tip.taskFinished();
|
|
tip.taskFinished();
|
|
synchronized(finishedCount) {
|
|
synchronized(finishedCount) {
|
|
finishedCount[0]++;
|
|
finishedCount[0]++;
|
|
- finishedCount.notifyAll();
|
|
|
|
|
|
+ finishedCount.notify();
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
|
|
LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
|
|
@@ -1176,8 +1200,14 @@ public class TaskTracker
|
|
static class RunningJob{
|
|
static class RunningJob{
|
|
Path jobFile;
|
|
Path jobFile;
|
|
// keep this for later use
|
|
// keep this for later use
|
|
- ArrayList tasks;
|
|
|
|
|
|
+ Set<TaskInProgress> tasks;
|
|
boolean localized;
|
|
boolean localized;
|
|
|
|
+
|
|
|
|
+ RunningJob(Path jobFile) {
|
|
|
|
+ localized = false;
|
|
|
|
+ tasks = new HashSet();
|
|
|
|
+ this.jobFile = jobFile;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1239,7 +1269,7 @@ public class TaskTracker
|
|
LOG.info("Ping exception: " + msg);
|
|
LOG.info("Ping exception: " + msg);
|
|
remainingRetries -=1;
|
|
remainingRetries -=1;
|
|
if (remainingRetries == 0) {
|
|
if (remainingRetries == 0) {
|
|
- getCallStacks();
|
|
|
|
|
|
+ ReflectionUtils.logThreadInfo(LOG, "ping exception", 0);
|
|
LOG.warn("Last retry, killing "+taskid);
|
|
LOG.warn("Last retry, killing "+taskid);
|
|
System.exit(65);
|
|
System.exit(65);
|
|
}
|
|
}
|
|
@@ -1329,9 +1359,11 @@ public class TaskTracker
|
|
System.out.println("usage: TaskTracker");
|
|
System.out.println("usage: TaskTracker");
|
|
System.exit(-1);
|
|
System.exit(-1);
|
|
}
|
|
}
|
|
-
|
|
|
|
try {
|
|
try {
|
|
JobConf conf=new JobConf();
|
|
JobConf conf=new JobConf();
|
|
|
|
+ // enable the server to track time spent waiting on locks
|
|
|
|
+ ReflectionUtils.setContentionTracing
|
|
|
|
+ (conf.getBoolean("tasktracker.contention.tracking", false));
|
|
new TaskTracker(conf).run();
|
|
new TaskTracker(conf).run();
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.warn( "Can not start task tracker because "+
|
|
LOG.warn( "Can not start task tracker because "+
|