|
@@ -20,6 +20,7 @@ import org.apache.commons.logging.*;
|
|
|
import org.apache.hadoop.fs.*;
|
|
|
import org.apache.hadoop.ipc.*;
|
|
|
import org.apache.hadoop.util.*;
|
|
|
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
|
|
|
|
import java.io.*;
|
|
|
import java.net.*;
|
|
@@ -101,32 +102,6 @@ public class TaskTracker
|
|
|
taskCleanupThread.start();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Start with the local machine name, and the default JobTracker
|
|
|
- */
|
|
|
- public TaskTracker(JobConf conf) throws IOException {
|
|
|
- maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
|
|
|
- this.fConf = conf;
|
|
|
- this.jobTrackAddr = JobTracker.getAddress(conf);
|
|
|
- this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);
|
|
|
- this.mapOutputFile = new MapOutputFile();
|
|
|
- this.mapOutputFile.setConf(conf);
|
|
|
- int httpPort = conf.getInt("tasktracker.http.port", 50060);
|
|
|
- StatusHttpServer server = new StatusHttpServer("task", httpPort, true);
|
|
|
- int workerThreads = conf.getInt("tasktracker.http.threads", 40);
|
|
|
- server.setThreads(1, workerThreads);
|
|
|
- server.start();
|
|
|
- this.httpPort = server.getPort();
|
|
|
- // let the jsp pages get to the task tracker, config, and other relevant
|
|
|
- // objects
|
|
|
- FileSystem local = FileSystem.getNamed("local", conf);
|
|
|
- server.setAttribute("task.tracker", this);
|
|
|
- server.setAttribute("local.file.system", local);
|
|
|
- server.setAttribute("conf", conf);
|
|
|
- server.setAttribute("log", LOG);
|
|
|
- initialize();
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Do the real constructor work here. It's in a separate method
|
|
|
* so we can call it again and "recycle" the object after calling
|
|
@@ -135,6 +110,8 @@ public class TaskTracker
|
|
|
synchronized void initialize() throws IOException {
|
|
|
this.localHostname = InetAddress.getLocalHost().getHostName();
|
|
|
|
|
|
+ //check local disk
|
|
|
+ checkLocalDirs(this.fConf.getLocalDirs());
|
|
|
fConf.deleteLocalFiles(SUBDIR);
|
|
|
|
|
|
// Clear out state tables
|
|
@@ -218,6 +195,32 @@ public class TaskTracker
|
|
|
this.mapOutputFile.cleanupStorage();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Start with the local machine name, and the default JobTracker
|
|
|
+ */
|
|
|
+ public TaskTracker(JobConf conf) throws IOException {
|
|
|
+ maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
|
|
|
+ this.fConf = conf;
|
|
|
+ this.jobTrackAddr = JobTracker.getAddress(conf);
|
|
|
+ this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);
|
|
|
+ this.mapOutputFile = new MapOutputFile();
|
|
|
+ this.mapOutputFile.setConf(conf);
|
|
|
+ int httpPort = conf.getInt("tasktracker.http.port", 50060);
|
|
|
+ StatusHttpServer server = new StatusHttpServer("task", httpPort, true);
|
|
|
+ int workerThreads = conf.getInt("tasktracker.http.threads", 40);
|
|
|
+ server.setThreads(1, workerThreads);
|
|
|
+ server.start();
|
|
|
+ this.httpPort = server.getPort();
|
|
|
+ // let the jsp pages get to the task tracker, config, and other relevant
|
|
|
+ // objects
|
|
|
+ FileSystem local = FileSystem.getNamed("local", conf);
|
|
|
+ server.setAttribute("task.tracker", this);
|
|
|
+ server.setAttribute("local.file.system", local);
|
|
|
+ server.setAttribute("conf", conf);
|
|
|
+ server.setAttribute("log", LOG);
|
|
|
+ initialize();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* The connection to the JobTracker, used by the TaskRunner
|
|
|
* for locating remote files.
|
|
@@ -287,11 +290,17 @@ public class TaskTracker
|
|
|
//
|
|
|
try {
|
|
|
if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) {
|
|
|
+ checkLocalDirs(fConf.getLocalDirs());
|
|
|
Task t = jobClient.pollForNewTask(taskTrackerName);
|
|
|
if (t != null) {
|
|
|
startNewTask(t);
|
|
|
}
|
|
|
}
|
|
|
+ } catch (DiskErrorException de ) {
|
|
|
+ LOG.warn("Exiting task tracker because "+de.getMessage());
|
|
|
+ jobClient.reportTaskTrackerError(taskTrackerName,
|
|
|
+ "DiskErrorException", de.getMessage());
|
|
|
+ return STALE_STATE;
|
|
|
} catch (IOException ie) {
|
|
|
LOG.info("Problem launching task: " +
|
|
|
StringUtils.stringifyException(ie));
|
|
@@ -913,6 +922,33 @@ public class TaskTracker
|
|
|
return fConf;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Check if the given local directories
|
|
|
+ * (and parent directories, if necessary) can be created.
|
|
|
+ * @param localDirs where the new TaskTracker should keep its local files.
|
|
|
+ * @throws DiskErrorException if all local directories are not writable
|
|
|
+ * @author hairong
|
|
|
+ */
|
|
|
+ private static void checkLocalDirs( String[] localDirs )
|
|
|
+ throws DiskErrorException {
|
|
|
+ boolean writable = false;
|
|
|
+
|
|
|
+ if( localDirs != null ) {
|
|
|
+ for (int i = 0; i < localDirs.length; i++) {
|
|
|
+ try {
|
|
|
+ DiskChecker.checkDir( new File(localDirs[i]) );
|
|
|
+ writable = true;
|
|
|
+ } catch( DiskErrorException e ) {
|
|
|
+ LOG.warn("Task Tracker local " + e.getMessage() );
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if( !writable )
|
|
|
+ throw new DiskErrorException(
|
|
|
+ "all local directories are not writable" );
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Start the TaskTracker, point toward the indicated JobTracker
|
|
|
*/
|
|
@@ -922,7 +958,12 @@ public class TaskTracker
|
|
|
System.exit(-1);
|
|
|
}
|
|
|
|
|
|
- JobConf conf=new JobConf();
|
|
|
- new TaskTracker(conf).run();
|
|
|
+ try {
|
|
|
+ JobConf conf=new JobConf();
|
|
|
+ new TaskTracker(conf).run();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn( "Can not start task tracker because "+e.getMessage());
|
|
|
+ System.exit(-1);
|
|
|
+ }
|
|
|
}
|
|
|
}
|