|
@@ -19,7 +19,7 @@ import org.apache.hadoop.fs.*;
|
|
|
import org.apache.hadoop.io.*;
|
|
|
import org.apache.hadoop.ipc.*;
|
|
|
import org.apache.hadoop.conf.*;
|
|
|
-import org.apache.hadoop.util.LogFormatter;
|
|
|
+import org.apache.hadoop.util.*;
|
|
|
|
|
|
import java.io.*;
|
|
|
import java.net.*;
|
|
@@ -106,9 +106,10 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
* close().
|
|
|
*/
|
|
|
void initialize() throws IOException {
|
|
|
- this.taskTrackerName = "tracker_" + (Math.abs(r.nextInt()) % 100000);
|
|
|
- LOG.info("Starting tracker " + taskTrackerName);
|
|
|
this.localHostname = InetAddress.getLocalHost().getHostName();
|
|
|
+ this.taskTrackerName = "tracker_" + localHostname + "_" +
|
|
|
+ (Math.abs(r.nextInt()) % 100000);
|
|
|
+ LOG.info("Starting tracker " + taskTrackerName);
|
|
|
|
|
|
new JobConf(this.fConf).deleteLocalFiles(SUBDIR);
|
|
|
|
|
@@ -267,17 +268,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) {
|
|
|
Task t = jobClient.pollForNewTask(taskTrackerName);
|
|
|
if (t != null) {
|
|
|
- TaskInProgress tip = new TaskInProgress(t, this.fConf);
|
|
|
- synchronized (this) {
|
|
|
- tasks.put(t.getTaskId(), tip);
|
|
|
- if (t.isMapTask()) {
|
|
|
- mapTotal++;
|
|
|
- } else {
|
|
|
- reduceTotal++;
|
|
|
- }
|
|
|
- runningTasks.put(t.getTaskId(), tip);
|
|
|
- }
|
|
|
- tip.launchTask();
|
|
|
+ startNewTask(t);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -321,6 +312,39 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Start a new task.
|
|
|
+ * All exceptions are handled locally, so that we don't mess up the
|
|
|
+ * task tracker.
|
|
|
+ */
|
|
|
+ private void startNewTask(Task t) {
|
|
|
+ TaskInProgress tip = new TaskInProgress(t, this.fConf);
|
|
|
+ synchronized (this) {
|
|
|
+ tasks.put(t.getTaskId(), tip);
|
|
|
+ runningTasks.put(t.getTaskId(), tip);
|
|
|
+ boolean isMap = t.isMapTask();
|
|
|
+ if (isMap) {
|
|
|
+ mapTotal++;
|
|
|
+ } else {
|
|
|
+ reduceTotal++;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ tip.launchTask();
|
|
|
+ } catch (Throwable ie) {
|
|
|
+ tip.runstate = TaskStatus.FAILED;
|
|
|
+ try {
|
|
|
+ tip.cleanup();
|
|
|
+ } catch (Throwable ie2) {
|
|
|
+ // Ignore it, we are just trying to cleanup.
|
|
|
+ }
|
|
|
+ String error = StringUtils.stringifyException(ie);
|
|
|
+ tip.reportDiagnosticInfo(error);
|
|
|
+ LOG.info(error);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* The server retry loop.
|
|
|
* This while-loop attempts to connect to the JobTracker. It only
|
|
@@ -377,12 +401,13 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
|
|
|
/**
|
|
|
*/
|
|
|
- public TaskInProgress(Task task, Configuration conf) throws IOException {
|
|
|
+ public TaskInProgress(Task task, Configuration conf) {
|
|
|
this.task = task;
|
|
|
+ this.progress = 0.0f;
|
|
|
+ this.runstate = TaskStatus.UNASSIGNED;
|
|
|
+ stateString = "initializing";
|
|
|
this.lastProgressReport = System.currentTimeMillis();
|
|
|
this.jobConf = new JobConf(conf);
|
|
|
- this.jobConf.deleteLocalFiles(SUBDIR + "/" + task.getTaskId());
|
|
|
- localizeTask(task);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -390,6 +415,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
* So here, edit the Task's fields appropriately.
|
|
|
*/
|
|
|
void localizeTask(Task t) throws IOException {
|
|
|
+ this.jobConf.deleteLocalFiles(SUBDIR + "/" + task.getTaskId());
|
|
|
Path localJobFile =
|
|
|
this.jobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.xml");
|
|
|
Path localJarFile =
|
|
@@ -436,9 +462,8 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
* Kick off the task execution
|
|
|
*/
|
|
|
public synchronized void launchTask() throws IOException {
|
|
|
- this.progress = 0.0f;
|
|
|
+ localizeTask(task);
|
|
|
this.runstate = TaskStatus.RUNNING;
|
|
|
- this.diagnosticInfo = new StringBuffer();
|
|
|
this.runner = task.createRunner(TaskTracker.this);
|
|
|
this.runner.start();
|
|
|
}
|