|
@@ -72,7 +72,33 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
|
|
|
private int maxCurrentTasks;
|
|
|
private int failures;
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A list of tips that should be cleaned up.
|
|
|
+ */
|
|
|
+ private BlockingQueue tasksToCleanup = new BlockingQueue();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A daemon-thread that pulls tips off the list of things to cleanup.
|
|
|
+ */
|
|
|
+ private Thread taskCleanupThread =
|
|
|
+ new Thread(new Runnable() {
|
|
|
+ public void run() {
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ TaskInProgress tip = (TaskInProgress) tasksToCleanup.take();
|
|
|
+ tip.jobHasFinished();
|
|
|
+ } catch (Throwable except) {
|
|
|
+ LOG.warning(StringUtils.stringifyException(except));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ {
|
|
|
+ taskCleanupThread.setDaemon(true);
|
|
|
+ taskCleanupThread.start();
|
|
|
+ }
|
|
|
+
|
|
|
class MapOutputServer extends RPC.Server {
|
|
|
private MapOutputServer(int port, int threads) {
|
|
|
super(TaskTracker.this, fConf, port, threads, false);
|
|
@@ -108,11 +134,8 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
* so we can call it again and "recycle" the object after calling
|
|
|
* close().
|
|
|
*/
|
|
|
- void initialize() throws IOException {
|
|
|
+ synchronized void initialize() throws IOException {
|
|
|
this.localHostname = InetAddress.getLocalHost().getHostName();
|
|
|
- this.taskTrackerName = "tracker_" + localHostname + "_" +
|
|
|
- (Math.abs(r.nextInt()) % 100000);
|
|
|
- LOG.info("Starting tracker " + taskTrackerName);
|
|
|
|
|
|
fConf.deleteLocalFiles(SUBDIR);
|
|
|
|
|
@@ -148,6 +171,9 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
this.mapOutputPort++;
|
|
|
}
|
|
|
}
|
|
|
+ this.taskTrackerName = "tracker_" +
|
|
|
+ localHostname + ":" + taskReportPort;
|
|
|
+ LOG.info("Starting tracker " + taskTrackerName);
|
|
|
|
|
|
// Clear out temporary files that might be lying around
|
|
|
this.mapOutputFile.cleanupStorage();
|
|
@@ -323,12 +349,11 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
if (toCloseIds != null) {
|
|
|
synchronized (this) {
|
|
|
for (int i = 0; i < toCloseIds.length; i++) {
|
|
|
- TaskInProgress tip = (TaskInProgress) tasks.get(toCloseIds[i]);
|
|
|
- try {
|
|
|
- tip.jobHasFinished();
|
|
|
- } catch (IOException ie) {
|
|
|
- LOG.info("problem finishing task: " +
|
|
|
- StringUtils.stringifyException(ie));
|
|
|
+ Object tip = tasks.get(toCloseIds[i]);
|
|
|
+ if (tip != null) {
|
|
|
+ tasksToCleanup.put(tip);
|
|
|
+ } else {
|
|
|
+ LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -376,7 +401,6 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
* The server retry loop.
|
|
|
* This while-loop attempts to connect to the JobTracker. It only
|
|
@@ -414,6 +438,51 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This class implements a queue that is put between producer and
|
|
|
+ * consumer threads. It will grow without bound.
|
|
|
+ * @author Owen O'Malley
|
|
|
+ */
|
|
|
+ static private class BlockingQueue {
|
|
|
+ private List queue;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create an empty queue.
|
|
|
+ */
|
|
|
+ public BlockingQueue() {
|
|
|
+ queue = new ArrayList();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Put the given object at the back of the queue.
|
|
|
+ * @param obj
|
|
|
+ */
|
|
|
+ public void put(Object obj) {
|
|
|
+ synchronized (queue) {
|
|
|
+ queue.add(obj);
|
|
|
+ queue.notify();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Take the object at the front of the queue.
|
|
|
+ * It blocks until there is an object available.
|
|
|
+ * @return the head of the queue
|
|
|
+ */
|
|
|
+ public Object take() {
|
|
|
+ synchronized (queue) {
|
|
|
+ while (queue.isEmpty()) {
|
|
|
+ try {
|
|
|
+ queue.wait();
|
|
|
+ } catch (InterruptedException ie) {}
|
|
|
+ }
|
|
|
+ Object result = queue.get(0);
|
|
|
+ queue.remove(0);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
///////////////////////////////////////////////////////
|
|
|
// TaskInProgress maintains all the info for a Task that
|
|
|
// lives at this TaskTracker. It maintains the Task object,
|
|
@@ -641,13 +710,19 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
|
|
|
* controlling job is all done and the files have been copied
|
|
|
* away, or the task failed and we don't need the remains.
|
|
|
*/
|
|
|
- synchronized void cleanup() throws IOException {
|
|
|
- tasks.remove(task.getTaskId());
|
|
|
- try {
|
|
|
- runner.close();
|
|
|
- } catch (IOException ie) {
|
|
|
+ void cleanup() throws IOException {
|
|
|
+ String taskId = task.getTaskId();
|
|
|
+ LOG.fine("Cleaning up " + taskId);
|
|
|
+ synchronized (TaskTracker.this) {
|
|
|
+ tasks.remove(taskId);
|
|
|
+ synchronized (this) {
|
|
|
+ try {
|
|
|
+ runner.close();
|
|
|
+ } catch (Throwable ie) {
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + task.getTaskId());
|
|
|
+ this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + taskId);
|
|
|
}
|
|
|
}
|
|
|
|