|
@@ -16,7 +16,6 @@
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
import org.apache.hadoop.fs.*;
|
|
|
-import org.apache.hadoop.io.*;
|
|
|
import org.apache.hadoop.ipc.*;
|
|
|
import org.apache.hadoop.util.*;
|
|
|
|
|
@@ -33,8 +32,10 @@ import java.util.logging.*;
|
|
|
* @author Mike Cafarella
|
|
|
*******************************************************/
|
|
|
public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutputProtocol, Runnable {
|
|
|
+ private static TaskTracker taskTracker = null;
|
|
|
static final long WAIT_FOR_DONE = 3 * 1000;
|
|
|
private long taskTimeout;
|
|
|
+ private int httpPort;
|
|
|
|
|
|
static final int STALE_STATE = 1;
|
|
|
|
|
@@ -111,14 +112,15 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
/**
|
|
|
* Start with the local machine name, and the default JobTracker
|
|
|
*/
|
|
|
- public TaskTracker(JobConf conf) throws IOException {
|
|
|
- this(JobTracker.getAddress(conf), conf);
|
|
|
+ public TaskTracker(JobConf conf, int httpPort) throws IOException {
|
|
|
+ this(JobTracker.getAddress(conf), conf, httpPort);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Start with the local machine name, and the addr of the target JobTracker
|
|
|
*/
|
|
|
- public TaskTracker(InetSocketAddress jobTrackAddr, JobConf conf) throws IOException {
|
|
|
+ public TaskTracker(InetSocketAddress jobTrackAddr, JobConf conf,
|
|
|
+ int httpPort) throws IOException {
|
|
|
maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
|
|
|
|
|
|
this.fConf = conf;
|
|
@@ -126,6 +128,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);
|
|
|
this.mapOutputFile = new MapOutputFile();
|
|
|
this.mapOutputFile.setConf(conf);
|
|
|
+ this.httpPort = httpPort;
|
|
|
initialize();
|
|
|
}
|
|
|
|
|
@@ -285,7 +288,8 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
|
|
|
TaskTrackerStatus status =
|
|
|
new TaskTrackerStatus(taskTrackerName, localHostname,
|
|
|
- mapOutputPort, taskReports, failures);
|
|
|
+ mapOutputPort, httpPort, taskReports,
|
|
|
+ failures);
|
|
|
int resultCode = jobClient.emitHeartbeat(status, justStarted);
|
|
|
justStarted = false;
|
|
|
|
|
@@ -905,10 +909,48 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the task tracker for use with the webapp stuff.
|
|
|
+ * @return The task tracker object
|
|
|
+ */
|
|
|
+ static TaskTracker getTracker() {
|
|
|
+ return taskTracker;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the name for this task tracker.
|
|
|
+ * @return the string like "tracker_mymachine:50010"
|
|
|
+ */
|
|
|
+ String getName() {
|
|
|
+ return taskTrackerName;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the list of tasks that will be reported back to the
|
|
|
+ * job tracker in the next heartbeat cycle.
|
|
|
+ * @return a copy of the list of TaskStatus objects
|
|
|
+ */
|
|
|
+ synchronized List getRunningTaskStatuses() {
|
|
|
+ List result = new ArrayList(runningTasks.size());
|
|
|
+ Iterator itr = runningTasks.values().iterator();
|
|
|
+ while (itr.hasNext()) {
|
|
|
+ TaskInProgress tip = (TaskInProgress) itr.next();
|
|
|
+ result.add(tip.createStatus());
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the default job conf for this tracker.
|
|
|
+ */
|
|
|
+ JobConf getJobConf() {
|
|
|
+ return fConf;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Start the TaskTracker, point toward the indicated JobTracker
|
|
|
*/
|
|
|
- public static void main(String argv[]) throws IOException {
|
|
|
+ public static void main(String argv[]) throws Exception {
|
|
|
if (argv.length != 0) {
|
|
|
System.out.println("usage: TaskTracker");
|
|
|
System.exit(-1);
|
|
@@ -916,7 +958,10 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
|
|
|
JobConf conf=new JobConf();
|
|
|
LogFormatter.initFileHandler( conf, "tasktracker" );
|
|
|
- TaskTracker tt = new TaskTracker(conf);
|
|
|
- tt.run();
|
|
|
+ int httpPort = conf.getInt("tasktracker.http.port", 50060);
|
|
|
+ StatusHttpServer server = new StatusHttpServer("task", httpPort);
|
|
|
+ server.start();
|
|
|
+ taskTracker = new TaskTracker(conf, httpPort);
|
|
|
+ taskTracker.run();
|
|
|
}
|
|
|
}
|