|
@@ -19,7 +19,9 @@ package org.apache.hadoop.mapred;
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.net.BindException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.net.UnknownHostException;
|
|
|
import java.text.NumberFormat;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
import java.util.ArrayList;
|
|
@@ -66,6 +68,17 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
static float TASK_ALLOC_EPSILON;
|
|
|
static float PAD_FRACTION;
|
|
|
static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
|
|
|
+ public static enum State { INITIALIZING, RUNNING }
|
|
|
+ State state = State.INITIALIZING;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A client tried to submit a job before the Job Tracker was ready.
|
|
|
+ */
|
|
|
+ public static class IllegalStateException extends IOException {
|
|
|
+ public IllegalStateException(String msg) {
|
|
|
+ super(msg);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* The maximum no. of 'completed' (successful/failed/killed)
|
|
@@ -85,9 +98,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
private int nextJobId = 1;
|
|
|
|
|
|
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobTracker");
|
|
|
-
|
|
|
- private static JobTracker tracker = null;
|
|
|
- private static boolean runTracker = true;
|
|
|
|
|
|
/**
|
|
|
* Start the JobTracker with given configuration.
|
|
@@ -99,17 +109,18 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* @param conf configuration for the JobTracker.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public static void startTracker(JobConf conf) throws IOException {
|
|
|
- if (tracker != null)
|
|
|
- throw new IOException("JobTracker already running.");
|
|
|
- runTracker = true;
|
|
|
- while (runTracker) {
|
|
|
+ public static JobTracker startTracker(JobConf conf) throws IOException {
|
|
|
+ JobTracker result = null;
|
|
|
+ while (true) {
|
|
|
try {
|
|
|
- tracker = new JobTracker(conf);
|
|
|
+ result = new JobTracker(conf);
|
|
|
break;
|
|
|
- } catch (VersionMismatch v) {
|
|
|
- // Can't recover from a version mismatch. Avoid the retry loop and re-throw
|
|
|
- throw v;
|
|
|
+ } catch (VersionMismatch e) {
|
|
|
+ throw e;
|
|
|
+ } catch (BindException e) {
|
|
|
+ throw e;
|
|
|
+ } catch (UnknownHostException e) {
|
|
|
+ throw e;
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Error starting tracker: " +
|
|
|
StringUtils.stringifyException(e));
|
|
@@ -117,25 +128,17 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
try {
|
|
|
Thread.sleep(1000);
|
|
|
} catch (InterruptedException e) {
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
- if (runTracker) {
|
|
|
+ if (result != null) {
|
|
|
JobEndNotifier.startNotifier();
|
|
|
- tracker.offerService();
|
|
|
}
|
|
|
+ return result;
|
|
|
}
|
|
|
|
|
|
- public static JobTracker getTracker() {
|
|
|
- return tracker;
|
|
|
- }
|
|
|
-
|
|
|
- public static void stopTracker() throws IOException {
|
|
|
- runTracker = false;
|
|
|
- if (tracker != null) {
|
|
|
- JobEndNotifier.stopNotifier();
|
|
|
- tracker.close();
|
|
|
- tracker = null;
|
|
|
- }
|
|
|
+ public void stopTracker() throws IOException {
|
|
|
+ JobEndNotifier.stopNotifier();
|
|
|
+ close();
|
|
|
}
|
|
|
|
|
|
public long getProtocolVersion(String protocol,
|
|
@@ -426,8 +429,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
private int numReduceTasksCompleted = 0;
|
|
|
private int numJobsSubmitted = 0;
|
|
|
private int numJobsCompleted = 0;
|
|
|
+ private JobTracker tracker;
|
|
|
|
|
|
- JobTrackerMetrics(JobConf conf) {
|
|
|
+ JobTrackerMetrics(JobTracker tracker, JobConf conf) {
|
|
|
String sessionId = conf.getSessionId();
|
|
|
// Initiate JVM Metrics
|
|
|
JvmMetrics.init("JobTracker", sessionId);
|
|
@@ -435,6 +439,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
MetricsContext context = MetricsUtil.getContext("mapred");
|
|
|
metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
|
|
|
metricsRecord.setTag("sessionId", sessionId);
|
|
|
+ this.tracker = tracker;
|
|
|
context.registerUpdater(this);
|
|
|
}
|
|
|
|
|
@@ -459,14 +464,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
numJobsCompleted = 0;
|
|
|
}
|
|
|
metricsRecord.update();
|
|
|
-
|
|
|
+
|
|
|
if (tracker != null) {
|
|
|
for (JobInProgress jip : tracker.getRunningJobs()) {
|
|
|
jip.updateMetrics();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
synchronized void launchMap() {
|
|
|
++numMapTasksLaunched;
|
|
|
}
|
|
@@ -629,13 +634,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
JobConf jobConf = new JobConf(conf);
|
|
|
this.systemDir = jobConf.getSystemDir();
|
|
|
this.fs = FileSystem.get(conf);
|
|
|
- fs.delete(systemDir);
|
|
|
- if (!fs.mkdirs(systemDir)) {
|
|
|
- throw new IOException("Mkdirs failed to create " + systemDir.toString());
|
|
|
- }
|
|
|
-
|
|
|
- // Same with 'localDir' except it's always on the local disk.
|
|
|
- jobConf.deleteLocalFiles(SUBDIR);
|
|
|
|
|
|
// Read the hosts/exclude files to restrict access to the jobtracker.
|
|
|
this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
|
|
@@ -648,23 +646,26 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
|
|
|
this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount, false, conf);
|
|
|
this.interTrackerServer.start();
|
|
|
- Properties p = System.getProperties();
|
|
|
- for (Iterator it = p.keySet().iterator(); it.hasNext();) {
|
|
|
- String key = (String) it.next();
|
|
|
- String val = (String) p.getProperty(key);
|
|
|
- LOG.info("Property '" + key + "' is " + val);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ Properties p = System.getProperties();
|
|
|
+ for (Iterator it = p.keySet().iterator(); it.hasNext();) {
|
|
|
+ String key = (String) it.next();
|
|
|
+ String val = (String) p.getProperty(key);
|
|
|
+ LOG.debug("Property '" + key + "' is " + val);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
|
|
|
this.infoBindAddress = conf.get("mapred.job.tracker.info.bindAddress","0.0.0.0");
|
|
|
- this.infoServer = new StatusHttpServer("job", infoBindAddress, infoPort, false);
|
|
|
- this.infoServer.start();
|
|
|
+ infoServer = new StatusHttpServer("job", infoBindAddress, infoPort, false);
|
|
|
+ infoServer.setAttribute("job.tracker", this);
|
|
|
+ infoServer.start();
|
|
|
|
|
|
this.startTime = System.currentTimeMillis();
|
|
|
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
|
|
|
trackerIdentifier = dateFormat.format(new Date());
|
|
|
|
|
|
- myMetrics = new JobTrackerMetrics(jobConf);
|
|
|
+ myMetrics = new JobTrackerMetrics(this, jobConf);
|
|
|
|
|
|
this.expireTrackersThread = new Thread(this.expireTrackers,
|
|
|
"expireTrackers");
|
|
@@ -683,6 +684,25 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
this.infoPort = this.infoServer.getPort();
|
|
|
this.conf.setInt("mapred.job.tracker.info.port", this.infoPort);
|
|
|
LOG.info("JobTracker webserver: " + this.infoServer.getPort());
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ fs.delete(systemDir);
|
|
|
+ if (fs.mkdirs(systemDir)) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ LOG.error("Mkdirs failed to create " + systemDir);
|
|
|
+ } catch (IOException ie) {
|
|
|
+ LOG.info("problem cleaning system directory: " + systemDir, ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Same with 'localDir' except it's always on the local disk.
|
|
|
+ jobConf.deleteLocalFiles(SUBDIR);
|
|
|
+ synchronized (this) {
|
|
|
+ state = State.RUNNING;
|
|
|
+ }
|
|
|
+ LOG.info("Starting RUNNING");
|
|
|
}
|
|
|
|
|
|
public static InetSocketAddress getAddress(Configuration conf) {
|
|
@@ -1455,14 +1475,22 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
// JobSubmissionProtocol
|
|
|
////////////////////////////////////////////////////
|
|
|
|
|
|
+ /**
|
|
|
+ * Make sure the JobTracker is done initializing.
|
|
|
+ */
|
|
|
+ private synchronized void ensureRunning() throws IllegalStateException {
|
|
|
+ if (state != State.RUNNING) {
|
|
|
+ throw new IllegalStateException("Job tracker still initializing");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Allocates a new JobId string.
|
|
|
*/
|
|
|
- public String getNewJobId() {
|
|
|
- synchronized (this) {
|
|
|
- return "job_" + getTrackerIdentifier() + "_" +
|
|
|
+ public synchronized String getNewJobId() throws IOException {
|
|
|
+ ensureRunning();
|
|
|
+ return "job_" + getTrackerIdentifier() + "_" +
|
|
|
idFormat.format(nextJobId++);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1478,6 +1506,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* the right TaskTracker/Block mapping.
|
|
|
*/
|
|
|
public synchronized JobStatus submitJob(String jobFile) throws IOException {
|
|
|
+ ensureRunning();
|
|
|
totalSubmissions++;
|
|
|
JobInProgress job = new JobInProgress(jobFile, this, this.conf);
|
|
|
synchronized (jobs) {
|
|
@@ -1526,7 +1555,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
return new ClusterStatus(taskTrackers.size(),
|
|
|
totalMaps,
|
|
|
totalReduces,
|
|
|
- maxCurrentTasks);
|
|
|
+ maxCurrentTasks,
|
|
|
+ state);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1816,7 +1846,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- startTracker(new JobConf());
|
|
|
+ JobTracker tracker = startTracker(new JobConf());
|
|
|
+ tracker.offerService();
|
|
|
} catch (Throwable e) {
|
|
|
LOG.fatal(StringUtils.stringifyException(e));
|
|
|
System.exit(-1);
|