|
@@ -70,7 +70,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
|
|
static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
|
|
public static enum State { INITIALIZING, RUNNING }
|
|
public static enum State { INITIALIZING, RUNNING }
|
|
State state = State.INITIALIZING;
|
|
State state = State.INITIALIZING;
|
|
-
|
|
|
|
|
|
+ private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* A client tried to submit a job before the Job Tracker was ready.
|
|
* A client tried to submit a job before the Job Tracker was ready.
|
|
*/
|
|
*/
|
|
@@ -109,7 +110,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
* @param conf configuration for the JobTracker.
|
|
* @param conf configuration for the JobTracker.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- public static JobTracker startTracker(JobConf conf) throws IOException {
|
|
|
|
|
|
+ public static JobTracker startTracker(JobConf conf
|
|
|
|
+ ) throws IOException,
|
|
|
|
+ InterruptedException {
|
|
JobTracker result = null;
|
|
JobTracker result = null;
|
|
while (true) {
|
|
while (true) {
|
|
try {
|
|
try {
|
|
@@ -125,10 +128,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
LOG.warn("Error starting tracker: " +
|
|
LOG.warn("Error starting tracker: " +
|
|
StringUtils.stringifyException(e));
|
|
StringUtils.stringifyException(e));
|
|
}
|
|
}
|
|
- try {
|
|
|
|
- Thread.sleep(1000);
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- }
|
|
|
|
|
|
+ Thread.sleep(1000);
|
|
}
|
|
}
|
|
if (result != null) {
|
|
if (result != null) {
|
|
JobEndNotifier.startNotifier();
|
|
JobEndNotifier.startNotifier();
|
|
@@ -605,14 +605,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
// Some jobs are stored in a local system directory. We can delete
|
|
// Some jobs are stored in a local system directory. We can delete
|
|
// the files when we're done with the job.
|
|
// the files when we're done with the job.
|
|
static final String SUBDIR = "jobTracker";
|
|
static final String SUBDIR = "jobTracker";
|
|
- FileSystem fs;
|
|
|
|
- Path systemDir;
|
|
|
|
|
|
+ FileSystem fs = null;
|
|
|
|
+ Path systemDir = null;
|
|
private JobConf conf;
|
|
private JobConf conf;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Start the JobTracker process, listen on the indicated port
|
|
* Start the JobTracker process, listen on the indicated port
|
|
*/
|
|
*/
|
|
- JobTracker(JobConf conf) throws IOException {
|
|
|
|
|
|
+ JobTracker(JobConf conf) throws IOException, InterruptedException {
|
|
//
|
|
//
|
|
// Grab some static constants
|
|
// Grab some static constants
|
|
//
|
|
//
|
|
@@ -630,8 +630,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
// on startup, and can delete any files that we're done with
|
|
// on startup, and can delete any files that we're done with
|
|
this.conf = conf;
|
|
this.conf = conf;
|
|
JobConf jobConf = new JobConf(conf);
|
|
JobConf jobConf = new JobConf(conf);
|
|
- this.systemDir = jobConf.getSystemDir();
|
|
|
|
- this.fs = FileSystem.get(conf);
|
|
|
|
|
|
|
|
// Read the hosts/exclude files to restrict access to the jobtracker.
|
|
// Read the hosts/exclude files to restrict access to the jobtracker.
|
|
this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
|
|
this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
|
|
@@ -682,9 +680,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
this.infoPort = this.infoServer.getPort();
|
|
this.infoPort = this.infoServer.getPort();
|
|
this.conf.setInt("mapred.job.tracker.info.port", this.infoPort);
|
|
this.conf.setInt("mapred.job.tracker.info.port", this.infoPort);
|
|
LOG.info("JobTracker webserver: " + this.infoServer.getPort());
|
|
LOG.info("JobTracker webserver: " + this.infoServer.getPort());
|
|
|
|
+ this.systemDir = jobConf.getSystemDir();
|
|
|
|
|
|
while (true) {
|
|
while (true) {
|
|
try {
|
|
try {
|
|
|
|
+ // if we haven't contacted the namenode go ahead and do it
|
|
|
|
+ if (fs == null) {
|
|
|
|
+ fs = FileSystem.get(conf);
|
|
|
|
+ }
|
|
|
|
+ // clean up the system dir, which will only work if hdfs is out of
|
|
|
|
+ // safe mode
|
|
fs.delete(systemDir);
|
|
fs.delete(systemDir);
|
|
if (fs.mkdirs(systemDir)) {
|
|
if (fs.mkdirs(systemDir)) {
|
|
break;
|
|
break;
|
|
@@ -693,6 +698,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
} catch (IOException ie) {
|
|
} catch (IOException ie) {
|
|
LOG.info("problem cleaning system directory: " + systemDir, ie);
|
|
LOG.info("problem cleaning system directory: " + systemDir, ie);
|
|
}
|
|
}
|
|
|
|
+ Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
|
|
}
|
|
}
|
|
|
|
|
|
// Same with 'localDir' except it's always on the local disk.
|
|
// Same with 'localDir' except it's always on the local disk.
|
|
@@ -719,11 +725,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
/**
|
|
/**
|
|
* Run forever
|
|
* Run forever
|
|
*/
|
|
*/
|
|
- public void offerService() {
|
|
|
|
- try {
|
|
|
|
- this.interTrackerServer.join();
|
|
|
|
- } catch (InterruptedException ie) {
|
|
|
|
- }
|
|
|
|
|
|
+ public void offerService() throws InterruptedException {
|
|
|
|
+ this.interTrackerServer.join();
|
|
LOG.info("Stopped interTrackerServer");
|
|
LOG.info("Stopped interTrackerServer");
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1850,7 +1853,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
* Start the JobTracker process. This is used only for debugging. As a rule,
|
|
* Start the JobTracker process. This is used only for debugging. As a rule,
|
|
* JobTracker should be run as part of the DFS Namenode process.
|
|
* JobTracker should be run as part of the DFS Namenode process.
|
|
*/
|
|
*/
|
|
- public static void main(String argv[]) throws IOException, InterruptedException {
|
|
|
|
|
|
+ public static void main(String argv[]
|
|
|
|
+ ) throws IOException, InterruptedException {
|
|
StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG);
|
|
StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG);
|
|
if (argv.length != 0) {
|
|
if (argv.length != 0) {
|
|
System.out.println("usage: JobTracker");
|
|
System.out.println("usage: JobTracker");
|