|
@@ -33,6 +33,7 @@ import java.util.logging.*;
|
|
|
* @author Mike Cafarella
|
|
|
*******************************************************/
|
|
|
public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmissionProtocol {
|
|
|
+ static long JOBINIT_SLEEP_INTERVAL = 2000;
|
|
|
static long RETIRE_JOB_INTERVAL;
|
|
|
static long RETIRE_JOB_CHECK_INTERVAL;
|
|
|
static float TASK_ALLOC_EPSILON;
|
|
@@ -156,14 +157,21 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
|
|
|
synchronized (jobs) {
|
|
|
- for (Iterator it = jobs.keySet().iterator(); it.hasNext(); ) {
|
|
|
- String jobid = (String) it.next();
|
|
|
- JobInProgress job = (JobInProgress) jobs.get(jobid);
|
|
|
-
|
|
|
- if (job.getStatus().getRunState() != JobStatus.RUNNING &&
|
|
|
- (job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) {
|
|
|
- it.remove();
|
|
|
- jobsByArrival.remove(job);
|
|
|
+ synchronized (jobInitQueue) {
|
|
|
+ synchronized (jobsByArrival) {
|
|
|
+ for (Iterator it = jobs.keySet().iterator(); it.hasNext(); ) {
|
|
|
+ String jobid = (String) it.next();
|
|
|
+ JobInProgress job = (JobInProgress) jobs.get(jobid);
|
|
|
+
|
|
|
+ if (job.getStatus().getRunState() != JobStatus.RUNNING &&
|
|
|
+ job.getStatus().getRunState() != JobStatus.PREP &&
|
|
|
+ (job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) {
|
|
|
+ it.remove();
|
|
|
+
|
|
|
+ jobInitQueue.remove(job);
|
|
|
+ jobsByArrival.remove(job);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -174,6 +182,43 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /////////////////////////////////////////////////////////////////
|
|
|
+ // Used to init new jobs that have just been created
|
|
|
+ /////////////////////////////////////////////////////////////////
|
|
|
+ class JobInitThread implements Runnable {
|
|
|
+ boolean shouldRun = true;
|
|
|
+ public JobInitThread() {
|
|
|
+ }
|
|
|
+ public void run() {
|
|
|
+ while (shouldRun) {
|
|
|
+ JobInProgress job = null;
|
|
|
+ synchronized (jobInitQueue) {
|
|
|
+ if (jobInitQueue.size() > 0) {
|
|
|
+ job = (JobInProgress) jobInitQueue.elementAt(0);
|
|
|
+ jobInitQueue.remove(job);
|
|
|
+ } else {
|
|
|
+ try {
|
|
|
+ jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL);
|
|
|
+ } catch (InterruptedException iex) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ if (job != null) {
|
|
|
+ job.initTasks();
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.log(Level.WARNING, "job init failed", e);
|
|
|
+ job.kill();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ public void stopIniter() {
|
|
|
+ shouldRun = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/////////////////////////////////////////////////////////////////
|
|
|
// The real JobTracker
|
|
|
////////////////////////////////////////////////////////////////
|
|
@@ -221,8 +266,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
int totalMaps = 0;
|
|
|
int totalReduces = 0;
|
|
|
TreeMap taskTrackers = new TreeMap();
|
|
|
+ Vector jobInitQueue = new Vector();
|
|
|
ExpireTrackers expireTrackers = new ExpireTrackers();
|
|
|
RetireJobs retireJobs = new RetireJobs();
|
|
|
+ JobInitThread initJobs = new JobInitThread();
|
|
|
|
|
|
/**
|
|
|
* It might seem like a bug to maintain a TreeSet of status objects,
|
|
@@ -307,6 +354,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
|
|
|
new Thread(this.expireTrackers).start();
|
|
|
new Thread(this.retireJobs).start();
|
|
|
+ new Thread(this.initJobs).start();
|
|
|
}
|
|
|
|
|
|
public static InetSocketAddress getAddress(Configuration conf) {
|
|
@@ -521,67 +569,69 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
// has not yet been removed from the pool, making capacity seem
|
|
|
// larger than it really is.)
|
|
|
//
|
|
|
- if ((numMaps < maxCurrentTasks) &&
|
|
|
- (numMaps <= (avgMaps + TASK_ALLOC_EPSILON))) {
|
|
|
-
|
|
|
- int totalNeededMaps = 0;
|
|
|
- for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
|
|
|
- JobInProgress job = (JobInProgress) it.next();
|
|
|
- if (job.getStatus().getRunState() != JobStatus.RUNNING) {
|
|
|
- continue;
|
|
|
- }
|
|
|
+ synchronized (jobsByArrival) {
|
|
|
+ if ((numMaps < maxCurrentTasks) &&
|
|
|
+ (numMaps <= (avgMaps + TASK_ALLOC_EPSILON))) {
|
|
|
+
|
|
|
+ int totalNeededMaps = 0;
|
|
|
+ for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
|
|
|
+ JobInProgress job = (JobInProgress) it.next();
|
|
|
+ if (job.getStatus().getRunState() != JobStatus.RUNNING) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
- Task t = job.obtainNewMapTask(taskTracker, tts);
|
|
|
- if (t != null) {
|
|
|
- return t;
|
|
|
- }
|
|
|
+ Task t = job.obtainNewMapTask(taskTracker, tts);
|
|
|
+ if (t != null) {
|
|
|
+ return t;
|
|
|
+ }
|
|
|
|
|
|
- //
|
|
|
- // Beyond the highest-priority task, reserve a little
|
|
|
- // room for failures and speculative executions; don't
|
|
|
- // schedule tasks to the hilt.
|
|
|
- //
|
|
|
- totalNeededMaps += job.desiredMaps();
|
|
|
- double padding = 0;
|
|
|
- if (totalCapacity > MIN_SLOTS_FOR_PADDING) {
|
|
|
- padding = Math.min(maxCurrentTasks, totalNeededMaps * PAD_FRACTION);
|
|
|
- }
|
|
|
- if (totalNeededMaps + padding >= totalCapacity) {
|
|
|
- break;
|
|
|
+ //
|
|
|
+ // Beyond the highest-priority task, reserve a little
|
|
|
+ // room for failures and speculative executions; don't
|
|
|
+ // schedule tasks to the hilt.
|
|
|
+ //
|
|
|
+ totalNeededMaps += job.desiredMaps();
|
|
|
+ double padding = 0;
|
|
|
+ if (totalCapacity > MIN_SLOTS_FOR_PADDING) {
|
|
|
+ padding = Math.min(maxCurrentTasks, totalNeededMaps * PAD_FRACTION);
|
|
|
+ }
|
|
|
+ if (totalNeededMaps + padding >= totalCapacity) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- //
|
|
|
- // Same thing, but for reduce tasks
|
|
|
- //
|
|
|
- if ((numReduces < maxCurrentTasks) &&
|
|
|
- (numReduces <= (avgReduces + TASK_ALLOC_EPSILON))) {
|
|
|
-
|
|
|
- int totalNeededReduces = 0;
|
|
|
- for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
|
|
|
- JobInProgress job = (JobInProgress) it.next();
|
|
|
- if (job.getStatus().getRunState() != JobStatus.RUNNING) {
|
|
|
- continue;
|
|
|
- }
|
|
|
+ //
|
|
|
+ // Same thing, but for reduce tasks
|
|
|
+ //
|
|
|
+ if ((numReduces < maxCurrentTasks) &&
|
|
|
+ (numReduces <= (avgReduces + TASK_ALLOC_EPSILON))) {
|
|
|
+
|
|
|
+ int totalNeededReduces = 0;
|
|
|
+ for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
|
|
|
+ JobInProgress job = (JobInProgress) it.next();
|
|
|
+ if (job.getStatus().getRunState() != JobStatus.RUNNING) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
- Task t = job.obtainNewReduceTask(taskTracker, tts);
|
|
|
- if (t != null) {
|
|
|
- return t;
|
|
|
- }
|
|
|
+ Task t = job.obtainNewReduceTask(taskTracker, tts);
|
|
|
+ if (t != null) {
|
|
|
+ return t;
|
|
|
+ }
|
|
|
|
|
|
- //
|
|
|
- // Beyond the highest-priority task, reserve a little
|
|
|
- // room for failures and speculative executions; don't
|
|
|
- // schedule tasks to the hilt.
|
|
|
- //
|
|
|
- totalNeededReduces += job.desiredReduces();
|
|
|
- double padding = 0;
|
|
|
- if (totalCapacity > MIN_SLOTS_FOR_PADDING) {
|
|
|
- padding = Math.min(maxCurrentTasks, totalNeededReduces * PAD_FRACTION);
|
|
|
- }
|
|
|
- if (totalNeededReduces + padding >= totalCapacity) {
|
|
|
- break;
|
|
|
+ //
|
|
|
+ // Beyond the highest-priority task, reserve a little
|
|
|
+ // room for failures and speculative executions; don't
|
|
|
+ // schedule tasks to the hilt.
|
|
|
+ //
|
|
|
+ totalNeededReduces += job.desiredReduces();
|
|
|
+ double padding = 0;
|
|
|
+ if (totalCapacity > MIN_SLOTS_FOR_PADDING) {
|
|
|
+ padding = Math.min(maxCurrentTasks, totalNeededReduces * PAD_FRACTION);
|
|
|
+ }
|
|
|
+ if (totalNeededReduces + padding >= totalCapacity) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -645,9 +695,31 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
////////////////////////////////////////////////////
|
|
|
// JobSubmissionProtocol
|
|
|
////////////////////////////////////////////////////
|
|
|
+ /**
|
|
|
+ * JobTracker.submitJob() kicks off a new job.
|
|
|
+ *
|
|
|
+ * Create a 'JobInProgress' object, which contains both JobProfile
|
|
|
+ * and JobStatus. Those two sub-objects are sometimes shipped outside
|
|
|
+ * of the JobTracker. But JobInProgress adds info that's useful for
|
|
|
+ * the JobTracker alone.
|
|
|
+ *
|
|
|
+ * We add the JIP to the jobInitQueue, which is processed
|
|
|
+ * asynchronously to handle split-computation and build up
|
|
|
+ * the right TaskTracker/Block mapping.
|
|
|
+ */
|
|
|
public synchronized JobStatus submitJob(String jobFile) throws IOException {
|
|
|
totalSubmissions++;
|
|
|
- JobInProgress job = createJob(jobFile);
|
|
|
+ JobInProgress job = new JobInProgress(jobFile, this, this.conf);
|
|
|
+ synchronized (jobs) {
|
|
|
+ synchronized (jobsByArrival) {
|
|
|
+ synchronized (jobInitQueue) {
|
|
|
+ jobs.put(job.getProfile().getJobId(), job);
|
|
|
+ jobsByArrival.add(job);
|
|
|
+ jobInitQueue.add(job);
|
|
|
+ jobInitQueue.notifyAll();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
return job.getStatus();
|
|
|
}
|
|
|
|
|
@@ -732,25 +804,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
return "" + Integer.toString(Math.abs(r.nextInt()),36);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * JobProfile createJob() kicks off a new job.
|
|
|
- * This function creates a job profile and also decomposes it into
|
|
|
- * tasks. The tasks are added to the unassignedTasks structure.
|
|
|
- * (The precise structure will change as we get more sophisticated about
|
|
|
- * task allocation.)
|
|
|
- *
|
|
|
- * Create a 'JobInProgress' object, which contains both JobProfile
|
|
|
- * and JobStatus. Those two sub-objects are sometimes shipped outside
|
|
|
- * of the JobTracker. But JobInProgress adds info that's useful for
|
|
|
- * the JobTracker alone.
|
|
|
- */
|
|
|
- JobInProgress createJob(String jobFile) throws IOException {
|
|
|
- JobInProgress job = new JobInProgress(jobFile, this, this.conf);
|
|
|
- jobs.put(job.getProfile().getJobId(), job);
|
|
|
- jobsByArrival.add(job);
|
|
|
- return job;
|
|
|
- }
|
|
|
-
|
|
|
////////////////////////////////////////////////////
|
|
|
// Methods to track all the TaskTrackers
|
|
|
////////////////////////////////////////////////////
|