|
@@ -20,7 +20,6 @@ package org.apache.hadoop.mapred;
|
|
|
import java.io.File;
|
|
|
import java.io.FileInputStream;
|
|
|
import java.io.FileNotFoundException;
|
|
|
-import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.OutputStream;
|
|
|
import java.io.RandomAccessFile;
|
|
@@ -62,7 +61,6 @@ import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
|
|
|
import org.apache.hadoop.mapreduce.server.tasktracker.*;
|
|
|
import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
|
|
|
import org.apache.hadoop.fs.DF;
|
|
|
-import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
@@ -76,15 +74,12 @@ import org.apache.hadoop.io.SecureIOUtils;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
-import org.apache.hadoop.mapred.QueueManager.QueueACL;
|
|
|
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
|
|
|
-import org.apache.hadoop.mapred.TaskController.DeletionContext;
|
|
|
import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
|
|
|
import org.apache.hadoop.mapred.TaskLog.LogName;
|
|
|
import org.apache.hadoop.mapred.TaskStatus.Phase;
|
|
|
import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
|
|
|
import org.apache.hadoop.mapred.pipes.Submitter;
|
|
|
-import org.apache.hadoop.mapreduce.JobContext;
|
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
|
|
|
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
|
@@ -101,7 +96,6 @@ import org.apache.hadoop.util.ProcfsBasedProcessTree;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
-import org.apache.hadoop.util.RunJar;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.VersionInfo;
|
|
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
@@ -167,94 +161,71 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
volatile boolean running = true;
|
|
|
|
|
|
/**
|
|
|
- * TaskTracker internal class only.
|
|
|
- * Manages the lists of good mapred local dirs and bad mapred local dirs.
|
|
|
+ * Manages TT local storage directories.
|
|
|
*/
|
|
|
- public static class LocalStorage {
|
|
|
- private List<String> goodLocalDirs = new ArrayList<String>();
|
|
|
- private List<String> badLocalDirs = new ArrayList<String>();
|
|
|
- private boolean diskFailed = false;
|
|
|
+ static class LocalStorage {
|
|
|
+ private List<String> localDirs;
|
|
|
+ private int numFailures;
|
|
|
|
|
|
- /**
|
|
|
- * TaskTracker internal only
|
|
|
- */
|
|
|
- public LocalStorage(String[] localDirs) {
|
|
|
- for (String s : localDirs) {
|
|
|
- goodLocalDirs.add(s);
|
|
|
- }
|
|
|
+ public LocalStorage(String[] dirs) {
|
|
|
+ localDirs = new ArrayList<String>();
|
|
|
+ localDirs.addAll(Arrays.asList(dirs));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @return good mapred local dirs list
|
|
|
+ * @return the current valid directories
|
|
|
*/
|
|
|
- synchronized String[] getGoodLocalDirs() {
|
|
|
- String[] rv = new String[goodLocalDirs.size()];
|
|
|
- return goodLocalDirs.toArray(rv);
|
|
|
+ synchronized String[] getDirs() {
|
|
|
+ return localDirs.toArray(new String[localDirs.size()]);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @return good mapred local dirs list as a commma seperated string
|
|
|
+ * @return the current valid dirs as comma separated string
|
|
|
*/
|
|
|
- synchronized String getGoodLocalDirsString() {
|
|
|
- StringBuffer sb = new StringBuffer();
|
|
|
- for (String s : goodLocalDirs) {
|
|
|
- if (sb.length() > 0) {
|
|
|
- sb.append(",");
|
|
|
- }
|
|
|
- sb.append(s);
|
|
|
- }
|
|
|
- return sb.toString();
|
|
|
+ synchronized String getDirsString() {
|
|
|
+ return StringUtils.join(",", localDirs);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @return bad mapred local dirs list
|
|
|
+ * @return the number of valid local directories
|
|
|
*/
|
|
|
- synchronized String[] getBadLocalDirs() {
|
|
|
- String[] rv = new String[badLocalDirs.size()];
|
|
|
- return badLocalDirs.toArray(rv);
|
|
|
+ synchronized int numDirs() {
|
|
|
+ return localDirs.size();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @return true if a disk has failed since the last
|
|
|
- * time this method was called
|
|
|
+ * @return the number of directory failures
|
|
|
*/
|
|
|
- synchronized boolean isDiskFailed() {
|
|
|
- boolean rv = diskFailed;
|
|
|
- diskFailed = false;
|
|
|
- return rv;
|
|
|
+ synchronized int numFailures() {
|
|
|
+ return numFailures;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Check if the given local directories
|
|
|
- * (and parent directories, if necessary) can be created.
|
|
|
- * Updates the list of good mapred local dirs and the list of bad local
|
|
|
- * dirs.
|
|
|
- * @throws DiskErrorException if all local directories are not writable
|
|
|
+ * Check the current set of local directories, updating the list
|
|
|
+ * of valid directories if necessary.
|
|
|
+ * @throws DiskErrorException if no directories are writable
|
|
|
*/
|
|
|
- synchronized void checkLocalDirs()
|
|
|
- throws DiskErrorException {
|
|
|
- for (String s : getGoodLocalDirs()) {
|
|
|
+ synchronized void checkDirs() throws DiskErrorException {
|
|
|
+ for (String dir : localDirs) {
|
|
|
try {
|
|
|
- DiskChecker.checkDir(new File(s));
|
|
|
- } catch(DiskErrorException e) {
|
|
|
- LOG.warn("Task Tracker localdir error " + e.getMessage()
|
|
|
- + ", removing from good locadirs");
|
|
|
- goodLocalDirs.remove(s);
|
|
|
- badLocalDirs.add(s);
|
|
|
- diskFailed = true;
|
|
|
+ DiskChecker.checkDir(new File(dir));
|
|
|
+ } catch (DiskErrorException de) {
|
|
|
+ LOG.warn("TaskTracker local dir " + dir + " error " +
|
|
|
+ de.getMessage() + ", removing from local dirs");
|
|
|
+ localDirs.remove(dir);
|
|
|
+ numFailures++;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- // no good local dirs ?
|
|
|
- if(goodLocalDirs.size() < 1) {
|
|
|
+ if (localDirs.isEmpty()) {
|
|
|
throw new DiskErrorException(
|
|
|
- "All mapred local directories are not writable.");
|
|
|
+ "No mapred local directories are writable");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private LocalStorage localStorage;
|
|
|
private long lastCheckDirsTime;
|
|
|
+ private int lastNumFailures;
|
|
|
private LocalDirAllocator localDirAllocator;
|
|
|
String taskTrackerName;
|
|
|
String localHostname;
|
|
@@ -680,7 +651,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
private void deleteUserDirectories(Configuration conf) throws IOException {
|
|
|
- for(String root: localStorage.getGoodLocalDirs()) {
|
|
|
+ for(String root: localStorage.getDirs()) {
|
|
|
for(FileStatus status: localFs.listStatus(new Path(root, SUBDIR))) {
|
|
|
String owner = status.getOwner();
|
|
|
String path = status.getPath().getName();
|
|
@@ -716,11 +687,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
fConf.get("mapred.tasktracker.dns.nameserver","default"));
|
|
|
}
|
|
|
|
|
|
- localStorage.checkLocalDirs();
|
|
|
- if (localStorage.isDiskFailed()) {
|
|
|
- // Ignore current disk failures. They are being handled now.
|
|
|
- }
|
|
|
- String dirs = localStorage.getGoodLocalDirsString();
|
|
|
+ final String dirs = localStorage.getDirsString();
|
|
|
fConf.setStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY, dirs);
|
|
|
LOG.info("Good mapred local directories are: " + dirs);
|
|
|
taskController.setConf(fConf);
|
|
@@ -730,23 +697,25 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
|
|
|
deleteUserDirectories(fConf);
|
|
|
|
|
|
+ // NB: deleteLocalFiles uses the configured local dirs, but does not
|
|
|
+ // fail if a local directory has failed.
|
|
|
fConf.deleteLocalFiles(SUBDIR);
|
|
|
final FsPermission ttdir = FsPermission.createImmutable((short) 0755);
|
|
|
- for (String s : localStorage.getGoodLocalDirs()) {
|
|
|
+ for (String s : localStorage.getDirs()) {
|
|
|
localFs.mkdirs(new Path(s, SUBDIR), ttdir);
|
|
|
}
|
|
|
fConf.deleteLocalFiles(TT_PRIVATE_DIR);
|
|
|
final FsPermission priv = FsPermission.createImmutable((short) 0700);
|
|
|
- for (String s : localStorage.getGoodLocalDirs()) {
|
|
|
+ for (String s : localStorage.getDirs()) {
|
|
|
localFs.mkdirs(new Path(s, TT_PRIVATE_DIR), priv);
|
|
|
}
|
|
|
fConf.deleteLocalFiles(TT_LOG_TMP_DIR);
|
|
|
final FsPermission pub = FsPermission.createImmutable((short) 0755);
|
|
|
- for (String s : localStorage.getGoodLocalDirs()) {
|
|
|
+ for (String s : localStorage.getDirs()) {
|
|
|
localFs.mkdirs(new Path(s, TT_LOG_TMP_DIR), pub);
|
|
|
}
|
|
|
// Create userlogs directory under all good mapred-local-dirs
|
|
|
- for (String s : localStorage.getGoodLocalDirs()) {
|
|
|
+ for (String s : localStorage.getDirs()) {
|
|
|
Path userLogsDir = new Path(s, TaskLog.USERLOGS_DIR_NAME);
|
|
|
if (!localFs.exists(userLogsDir)) {
|
|
|
localFs.mkdirs(userLogsDir, pub);
|
|
@@ -845,7 +814,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
reduceLauncher.start();
|
|
|
|
|
|
// create a localizer instance
|
|
|
- setLocalizer(new Localizer(localFs, localStorage.getGoodLocalDirs()));
|
|
|
+ setLocalizer(new Localizer(localFs, localStorage.getDirs()));
|
|
|
|
|
|
//Start up node health checker service.
|
|
|
if (shouldStartHealthMonitor(this.fConf)) {
|
|
@@ -856,13 +825,6 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * @return TaskTracker's LocalStorage object
|
|
|
- */
|
|
|
- LocalStorage getLocalStorage() {
|
|
|
- return localStorage;
|
|
|
- }
|
|
|
-
|
|
|
private void createInstrumentation() {
|
|
|
Class<? extends TaskTrackerInstrumentation> metricsInst =
|
|
|
getInstrumentationClass(fConf);
|
|
@@ -1316,7 +1278,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
RunningJob rjob) throws IOException {
|
|
|
synchronized (tip) {
|
|
|
jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
|
|
|
- localStorage.getGoodLocalDirsString());
|
|
|
+ localStorage.getDirsString());
|
|
|
tip.setJobConf(jobConf);
|
|
|
tip.setUGI(rjob.ugi);
|
|
|
tip.launchTask(rjob);
|
|
@@ -1406,14 +1368,6 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
fConf = conf;
|
|
|
}
|
|
|
|
|
|
- void setLocalStorage(LocalStorage in) {
|
|
|
- localStorage = in;
|
|
|
- }
|
|
|
-
|
|
|
- void setLocalDirAllocator(LocalDirAllocator in) {
|
|
|
- localDirAllocator = in;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Start with the local machine name, and the default JobTracker
|
|
|
*/
|
|
@@ -1451,10 +1405,11 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
|
|
|
fConf = new JobConf(conf);
|
|
|
localStorage = new LocalStorage(fConf.getLocalDirs());
|
|
|
- localStorage.checkLocalDirs();
|
|
|
+ localStorage.checkDirs();
|
|
|
taskController =
|
|
|
(TaskController) ReflectionUtils.newInstance(taskControllerClass, fConf);
|
|
|
taskController.setup(localDirAllocator, localStorage);
|
|
|
+ lastNumFailures = localStorage.numFailures();
|
|
|
|
|
|
// create user log manager
|
|
|
setUserLogManager(new UserLogManager(conf, taskController));
|
|
@@ -1605,10 +1560,12 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
|
|
|
now = System.currentTimeMillis();
|
|
|
if (now > (lastCheckDirsTime + diskHealthCheckInterval)) {
|
|
|
- localStorage.checkLocalDirs();
|
|
|
+ localStorage.checkDirs();
|
|
|
lastCheckDirsTime = now;
|
|
|
- // If any of the good disks failed, re-init the task tracker
|
|
|
- if (localStorage.isDiskFailed()) {
|
|
|
+ int numFailures = localStorage.numFailures();
|
|
|
+ // Re-init the task tracker if there were any new failures
|
|
|
+ if (numFailures > lastNumFailures) {
|
|
|
+ lastNumFailures = numFailures;
|
|
|
return State.STALE;
|
|
|
}
|
|
|
}
|
|
@@ -1994,7 +1951,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
jobDir.substring(userDir.length()));
|
|
|
directoryCleanupThread.addToQueue(jobCleanup);
|
|
|
|
|
|
- for (String str : localStorage.getGoodLocalDirs()) {
|
|
|
+ for (String str : localStorage.getDirs()) {
|
|
|
Path ttPrivateJobDir = FileSystem.getLocal(fConf).makeQualified(
|
|
|
new Path(str, TaskTracker.getPrivateDirForJob(user, jobId.toString())));
|
|
|
PathDeletionContext ttPrivateJobCleanup =
|
|
@@ -2116,7 +2073,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
|
|
|
private long getFreeSpace() throws IOException {
|
|
|
long biggestSeenSoFar = 0;
|
|
|
- String[] localDirs = localStorage.getGoodLocalDirs();
|
|
|
+ String[] localDirs = localStorage.getDirs();
|
|
|
for (int i = 0; i < localDirs.length; i++) {
|
|
|
DF df = null;
|
|
|
if (localDirsDf.containsKey(localDirs[i])) {
|