|
@@ -166,8 +166,96 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
|
|
|
volatile boolean running = true;
|
|
|
|
|
|
+ /**
|
|
|
+ * TaskTracker internal class only.
|
|
|
+ * Manages the lists of good mapred local dirs and bad mapred local dirs.
|
|
|
+ */
|
|
|
+ public static class LocalStorage {
|
|
|
+ private List<String> goodLocalDirs = new ArrayList<String>();
|
|
|
+ private List<String> badLocalDirs = new ArrayList<String>();
|
|
|
+ private boolean diskFailed = false;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * TaskTracker internal only
|
|
|
+ */
|
|
|
+ public LocalStorage(String[] localDirs) {
|
|
|
+ for (String s : localDirs) {
|
|
|
+ goodLocalDirs.add(s);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return good mapred local dirs list
|
|
|
+ */
|
|
|
+ synchronized String[] getGoodLocalDirs() {
|
|
|
+ String[] rv = new String[goodLocalDirs.size()];
|
|
|
+ return goodLocalDirs.toArray(rv);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return good mapred local dirs list as a commma seperated string
|
|
|
+ */
|
|
|
+ synchronized String getGoodLocalDirsString() {
|
|
|
+ StringBuffer sb = new StringBuffer();
|
|
|
+ for (String s : goodLocalDirs) {
|
|
|
+ if (sb.length() > 0) {
|
|
|
+ sb.append(",");
|
|
|
+ }
|
|
|
+ sb.append(s);
|
|
|
+ }
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return bad mapred local dirs list
|
|
|
+ */
|
|
|
+ synchronized String[] getBadLocalDirs() {
|
|
|
+ String[] rv = new String[badLocalDirs.size()];
|
|
|
+ return badLocalDirs.toArray(rv);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return true if a disk has failed since the last
|
|
|
+ * time this method was called
|
|
|
+ */
|
|
|
+ synchronized boolean isDiskFailed() {
|
|
|
+ boolean rv = diskFailed;
|
|
|
+ diskFailed = false;
|
|
|
+ return rv;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check if the given local directories
|
|
|
+ * (and parent directories, if necessary) can be created.
|
|
|
+ * Updates the list of good mapred local dirs and the list of bad local
|
|
|
+ * dirs.
|
|
|
+ * @throws DiskErrorException if all local directories are not writable
|
|
|
+ */
|
|
|
+ synchronized void checkLocalDirs()
|
|
|
+ throws DiskErrorException {
|
|
|
+ for (String s : getGoodLocalDirs()) {
|
|
|
+ try {
|
|
|
+ DiskChecker.checkDir(new File(s));
|
|
|
+ } catch(DiskErrorException e) {
|
|
|
+ LOG.warn("Task Tracker localdir error " + e.getMessage()
|
|
|
+ + ", removing from good locadirs");
|
|
|
+ goodLocalDirs.remove(s);
|
|
|
+ badLocalDirs.add(s);
|
|
|
+ diskFailed = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // no good local dirs ?
|
|
|
+ if(goodLocalDirs.size() < 1) {
|
|
|
+ throw new DiskErrorException(
|
|
|
+ "All mapred local directories are not writable.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private LocalStorage localStorage;
|
|
|
+ private long lastCheckDirsTime;
|
|
|
private LocalDirAllocator localDirAllocator;
|
|
|
- private String[] localdirs;
|
|
|
String taskTrackerName;
|
|
|
String localHostname;
|
|
|
InetSocketAddress jobTrackAddr;
|
|
@@ -315,6 +403,19 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
*/
|
|
|
private NodeHealthCheckerService healthChecker;
|
|
|
|
|
|
+ /**
|
|
|
+ * Configuration property for disk health check interval in milli seconds.
|
|
|
+ * Currently, configuring this to a value smaller than the heartbeat interval
|
|
|
+ * is equivalent to setting this to heartbeat interval value.
|
|
|
+ */
|
|
|
+ static final String DISK_HEALTH_CHECK_INTERVAL_PROPERTY =
|
|
|
+ "mapred.disk.healthChecker.interval";
|
|
|
+ /**
|
|
|
+ * How often TaskTracker needs to check the health of its disks.
|
|
|
+ * Default value is {@link MRConstants#DEFAULT_DISK_HEALTH_CHECK_INTERVAL}
|
|
|
+ */
|
|
|
+ private long diskHealthCheckInterval;
|
|
|
+
|
|
|
/*
|
|
|
* A list of commitTaskActions for whom commit response has been received
|
|
|
*/
|
|
@@ -579,7 +680,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
private void deleteUserDirectories(Configuration conf) throws IOException {
|
|
|
- for(String root: localdirs) {
|
|
|
+ for(String root: localStorage.getGoodLocalDirs()) {
|
|
|
for(FileStatus status: localFs.listStatus(new Path(root, SUBDIR))) {
|
|
|
String owner = status.getOwner();
|
|
|
String path = status.getPath().getName();
|
|
@@ -614,23 +715,34 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
(fConf.get("mapred.tasktracker.dns.interface","default"),
|
|
|
fConf.get("mapred.tasktracker.dns.nameserver","default"));
|
|
|
}
|
|
|
-
|
|
|
- //check local disk
|
|
|
- checkLocalDirs((localdirs = this.fConf.getLocalDirs()));
|
|
|
+
|
|
|
+ localStorage.checkLocalDirs();
|
|
|
+ if (localStorage.isDiskFailed()) {
|
|
|
+ // Ignore current disk failures. They are being handled now.
|
|
|
+ }
|
|
|
+ String dirs = localStorage.getGoodLocalDirsString();
|
|
|
+ fConf.setStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY, dirs);
|
|
|
+ LOG.info("Good mapred local directories are: " + dirs);
|
|
|
+ taskController.setConf(fConf);
|
|
|
+ // Setup task controller so that deletion of user dirs happens properly
|
|
|
+ taskController.setup(localDirAllocator, localStorage);
|
|
|
+ server.setAttribute("conf", fConf);
|
|
|
+
|
|
|
deleteUserDirectories(fConf);
|
|
|
+
|
|
|
fConf.deleteLocalFiles(SUBDIR);
|
|
|
final FsPermission ttdir = FsPermission.createImmutable((short) 0755);
|
|
|
- for (String s : localdirs) {
|
|
|
+ for (String s : localStorage.getGoodLocalDirs()) {
|
|
|
localFs.mkdirs(new Path(s, SUBDIR), ttdir);
|
|
|
}
|
|
|
fConf.deleteLocalFiles(TT_PRIVATE_DIR);
|
|
|
final FsPermission priv = FsPermission.createImmutable((short) 0700);
|
|
|
- for (String s : localdirs) {
|
|
|
+ for (String s : localStorage.getGoodLocalDirs()) {
|
|
|
localFs.mkdirs(new Path(s, TT_PRIVATE_DIR), priv);
|
|
|
}
|
|
|
fConf.deleteLocalFiles(TT_LOG_TMP_DIR);
|
|
|
final FsPermission pub = FsPermission.createImmutable((short) 0755);
|
|
|
- for (String s : localdirs) {
|
|
|
+ for (String s : localStorage.getGoodLocalDirs()) {
|
|
|
localFs.mkdirs(new Path(s, TT_LOG_TMP_DIR), pub);
|
|
|
}
|
|
|
|
|
@@ -726,7 +838,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
reduceLauncher.start();
|
|
|
|
|
|
// create a localizer instance
|
|
|
- setLocalizer(new Localizer(localFs, fConf.getLocalDirs()));
|
|
|
+ setLocalizer(new Localizer(localFs, localStorage.getGoodLocalDirs()));
|
|
|
|
|
|
//Start up node health checker service.
|
|
|
if (shouldStartHealthMonitor(this.fConf)) {
|
|
@@ -737,6 +849,13 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * @return TaskTracker's LocalStorage object
|
|
|
+ */
|
|
|
+ LocalStorage getLocalStorage() {
|
|
|
+ return localStorage;
|
|
|
+ }
|
|
|
+
|
|
|
private void createInstrumentation() {
|
|
|
Class<? extends TaskTrackerInstrumentation> metricsInst =
|
|
|
getInstrumentationClass(fConf);
|
|
@@ -1187,6 +1306,8 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
private void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
|
|
|
RunningJob rjob) throws IOException {
|
|
|
synchronized (tip) {
|
|
|
+ jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
|
|
|
+ localStorage.getGoodLocalDirsString());
|
|
|
tip.setJobConf(jobConf);
|
|
|
tip.setUGI(rjob.ugi);
|
|
|
tip.launchTask(rjob);
|
|
@@ -1285,6 +1406,8 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
"mapred.tasktracker.map.tasks.maximum", 2);
|
|
|
maxReduceSlots = conf.getInt(
|
|
|
"mapred.tasktracker.reduce.tasks.maximum", 2);
|
|
|
+ diskHealthCheckInterval = conf.getLong(DISK_HEALTH_CHECK_INTERVAL_PROPERTY,
|
|
|
+ DEFAULT_DISK_HEALTH_CHECK_INTERVAL);
|
|
|
UserGroupInformation.setConfiguration(originalConf);
|
|
|
aclsManager = new ACLsManager(conf, new JobACLsManager(conf), null);
|
|
|
this.jobTrackAddr = JobTracker.getAddress(conf);
|
|
@@ -1307,9 +1430,13 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
Class<? extends TaskController> taskControllerClass =
|
|
|
conf.getClass("mapred.task.tracker.task-controller",
|
|
|
DefaultTaskController.class, TaskController.class);
|
|
|
- taskController =
|
|
|
- (TaskController) ReflectionUtils.newInstance(taskControllerClass, conf);
|
|
|
- taskController.setup(localDirAllocator);
|
|
|
+
|
|
|
+ fConf = new JobConf(conf);
|
|
|
+ localStorage = new LocalStorage(fConf.getLocalDirs());
|
|
|
+ localStorage.checkLocalDirs();
|
|
|
+ taskController =
|
|
|
+ (TaskController) ReflectionUtils.newInstance(taskControllerClass, fConf);
|
|
|
+ taskController.setup(localDirAllocator, localStorage);
|
|
|
|
|
|
// create user log manager
|
|
|
setUserLogManager(new UserLogManager(conf, taskController));
|
|
@@ -1319,7 +1446,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
this.shuffleServerMetrics = ShuffleServerInstrumentation.create(this);
|
|
|
server.setAttribute("task.tracker", this);
|
|
|
server.setAttribute("local.file.system", local);
|
|
|
- server.setAttribute("conf", conf);
|
|
|
+
|
|
|
server.setAttribute("log", LOG);
|
|
|
server.setAttribute("localDirAllocator", localDirAllocator);
|
|
|
server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
|
|
@@ -1448,7 +1575,17 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
systemDirectory = new Path(dir);
|
|
|
systemFS = systemDirectory.getFileSystem(fConf);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ now = System.currentTimeMillis();
|
|
|
+ if (now > (lastCheckDirsTime + diskHealthCheckInterval)) {
|
|
|
+ localStorage.checkLocalDirs();
|
|
|
+ lastCheckDirsTime = now;
|
|
|
+ // If any of the good disks failed, re-init the task tracker
|
|
|
+ if (localStorage.isDiskFailed()) {
|
|
|
+ return State.STALE;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// Send the heartbeat and process the jobtracker's directives
|
|
|
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
|
|
|
|
|
@@ -1456,7 +1593,6 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
// next heartbeat
|
|
|
lastHeartbeat = System.currentTimeMillis();
|
|
|
|
|
|
-
|
|
|
// Check if the map-event list needs purging
|
|
|
Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
|
|
|
if (jobs.size() > 0) {
|
|
@@ -1615,7 +1751,6 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
localMinSpaceStart = minSpaceStart;
|
|
|
}
|
|
|
if (askForNewTask) {
|
|
|
- checkLocalDirs(fConf.getLocalDirs());
|
|
|
askForNewTask = enoughFreeSpace(localMinSpaceStart);
|
|
|
long freeDiskSpace = getFreeSpace();
|
|
|
long totVmem = getTotalVirtualMemoryOnTT();
|
|
@@ -1832,7 +1967,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
jobDir.substring(userDir.length()));
|
|
|
directoryCleanupThread.addToQueue(jobCleanup);
|
|
|
|
|
|
- for (String str : localdirs) {
|
|
|
+ for (String str : localStorage.getGoodLocalDirs()) {
|
|
|
Path ttPrivateJobDir = FileSystem.getLocal(fConf).makeQualified(
|
|
|
new Path(str, TaskTracker.getPrivateDirForJob(user, jobId.toString())));
|
|
|
PathDeletionContext ttPrivateJobCleanup =
|
|
@@ -1954,7 +2089,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
|
|
|
private long getFreeSpace() throws IOException {
|
|
|
long biggestSeenSoFar = 0;
|
|
|
- String[] localDirs = fConf.getLocalDirs();
|
|
|
+ String[] localDirs = localStorage.getGoodLocalDirs();
|
|
|
for (int i = 0; i < localDirs.length; i++) {
|
|
|
DF df = null;
|
|
|
if (localDirsDf.containsKey(localDirs[i])) {
|
|
@@ -3412,32 +3547,6 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
return fConf;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Check if the given local directories
|
|
|
- * (and parent directories, if necessary) can be created.
|
|
|
- * @param localDirs where the new TaskTracker should keep its local files.
|
|
|
- * @throws DiskErrorException if all local directories are not writable
|
|
|
- */
|
|
|
- private static void checkLocalDirs(String[] localDirs)
|
|
|
- throws DiskErrorException {
|
|
|
- boolean writable = false;
|
|
|
-
|
|
|
- if (localDirs != null) {
|
|
|
- for (int i = 0; i < localDirs.length; i++) {
|
|
|
- try {
|
|
|
- DiskChecker.checkDir(new File(localDirs[i]));
|
|
|
- writable = true;
|
|
|
- } catch(DiskErrorException e) {
|
|
|
- LOG.warn("Task Tracker local " + e.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (!writable)
|
|
|
- throw new DiskErrorException(
|
|
|
- "all local directories are not writable");
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Is this task tracker idle?
|
|
|
* @return has this task tracker finished and cleaned up all of its tasks?
|