|
@@ -65,6 +65,18 @@ public class TaskTracker
|
|
int mapTotal = 0;
|
|
int mapTotal = 0;
|
|
int reduceTotal = 0;
|
|
int reduceTotal = 0;
|
|
boolean justStarted = true;
|
|
boolean justStarted = true;
|
|
|
|
+
|
|
|
|
+ //dir -> DF
|
|
|
|
+ Map localDirsDf = new HashMap();
|
|
|
|
+ long minSpaceStart = 0;
|
|
|
|
+ //must have this much space free to start new tasks
|
|
|
|
+ boolean acceptNewTasks = true;
|
|
|
|
+ long minSpaceKill = 0;
|
|
|
|
+ //if we run under this limit, kill one task
|
|
|
|
+ //and make sure we never receive any new jobs
|
|
|
|
+ //until all the old tasks have been cleaned up.
|
|
|
|
+ //this is if a machine is so full it's only good
|
|
|
|
+ //for serving map output to the other nodes
|
|
|
|
|
|
static Random r = new Random();
|
|
static Random r = new Random();
|
|
FileSystem fs = null;
|
|
FileSystem fs = null;
|
|
@@ -119,7 +131,12 @@ public class TaskTracker
|
|
this.runningTasks = new TreeMap();
|
|
this.runningTasks = new TreeMap();
|
|
this.mapTotal = 0;
|
|
this.mapTotal = 0;
|
|
this.reduceTotal = 0;
|
|
this.reduceTotal = 0;
|
|
-
|
|
|
|
|
|
+ this.acceptNewTasks = true;
|
|
|
|
+
|
|
|
|
+ this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
|
|
|
|
+ this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
|
|
|
|
+
|
|
|
|
+
|
|
// port numbers
|
|
// port numbers
|
|
this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050);
|
|
this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050);
|
|
|
|
|
|
@@ -331,11 +348,14 @@ public class TaskTracker
|
|
// Check if we should create a new Task
|
|
// Check if we should create a new Task
|
|
//
|
|
//
|
|
try {
|
|
try {
|
|
- if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) {
|
|
|
|
|
|
+ if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) && acceptNewTasks) {
|
|
checkLocalDirs(fConf.getLocalDirs());
|
|
checkLocalDirs(fConf.getLocalDirs());
|
|
- Task t = jobClient.pollForNewTask(taskTrackerName);
|
|
|
|
- if (t != null) {
|
|
|
|
- startNewTask(t);
|
|
|
|
|
|
+
|
|
|
|
+ if (enoughFreeSpace(minSpaceStart)) {
|
|
|
|
+ Task t = jobClient.pollForNewTask(taskTrackerName);
|
|
|
|
+ if (t != null) {
|
|
|
|
+ startNewTask(t);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch (DiskErrorException de ) {
|
|
} catch (DiskErrorException de ) {
|
|
@@ -403,12 +423,99 @@ public class TaskTracker
|
|
LOG.info("Problem getting closed tasks: " +
|
|
LOG.info("Problem getting closed tasks: " +
|
|
StringUtils.stringifyException(ie));
|
|
StringUtils.stringifyException(ie));
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ //Check if we're dangerously low on disk space
|
|
|
|
+ // If so, kill jobs to free up space and make sure
|
|
|
|
+ // we don't accept any new tasks
|
|
|
|
+ // Try killing the reduce jobs first, since I believe they
|
|
|
|
+ // use up most space
|
|
|
|
+ // Then pick the one with least progress
|
|
|
|
+
|
|
|
|
+ if (!enoughFreeSpace(minSpaceKill)) {
|
|
|
|
+ acceptNewTasks=false;
|
|
|
|
+ //we give up! do not accept new tasks until
|
|
|
|
+ //all the ones running have finished and they're all cleared up
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ TaskInProgress killMe = null;
|
|
|
|
+
|
|
|
|
+ for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
|
|
|
|
+ TaskInProgress tip = (TaskInProgress) it.next();
|
|
|
|
+ if ((tip.getRunState() == TaskStatus.RUNNING) &&
|
|
|
|
+ !tip.wasKilled) {
|
|
|
|
+
|
|
|
|
+ if (killMe == null) {
|
|
|
|
+ killMe = tip;
|
|
|
|
+
|
|
|
|
+ } else if (!tip.getTask().isMapTask()) {
|
|
|
|
+ //reduce task, give priority
|
|
|
|
+ if (killMe.getTask().isMapTask() ||
|
|
|
|
+ (tip.getTask().getProgress().get() <
|
|
|
|
+ killMe.getTask().getProgress().get())) {
|
|
|
|
+
|
|
|
|
+ killMe = tip;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ } else if (killMe.getTask().isMapTask() &&
|
|
|
|
+ tip.getTask().getProgress().get() <
|
|
|
|
+ killMe.getTask().getProgress().get()) {
|
|
|
|
+ //map task, only add if the progress is lower
|
|
|
|
+
|
|
|
|
+ killMe = tip;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (killMe!=null) {
|
|
|
|
+ String msg = "Tasktracker running out of space. Killing task.";
|
|
|
|
+ LOG.info(killMe.getTask().getTaskId() + ": " + msg);
|
|
|
|
+ killMe.reportDiagnosticInfo(msg);
|
|
|
|
+ try {
|
|
|
|
+ killMe.killAndCleanup(true);
|
|
|
|
+ } catch (IOException ie) {
|
|
|
|
+ LOG.info("Problem cleaning task up: " +
|
|
|
|
+ StringUtils.stringifyException(ie));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ //we've cleaned up, resume normal operation
|
|
|
|
+ if (!acceptNewTasks && tasks.isEmpty()) {
|
|
|
|
+ acceptNewTasks=true;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
|
|
+ * Check if all of the local directories have enough
|
|
|
|
+ * free space
|
|
|
|
+ *
|
|
|
|
+ * If not, do not try to get a new task assigned
|
|
|
|
+ * @return
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ private boolean enoughFreeSpace(long minSpace) throws IOException {
|
|
|
|
+ String[] localDirs = fConf.getLocalDirs();
|
|
|
|
+ for (int i = 0; i < localDirs.length; i++) {
|
|
|
|
+ DF df = null;
|
|
|
|
+ if (localDirsDf.containsKey(localDirs[i])) {
|
|
|
|
+ df = (DF) localDirsDf.get(localDirs[i]);
|
|
|
|
+ } else {
|
|
|
|
+ df = new DF(localDirs[i], fConf);
|
|
|
|
+ localDirsDf.put(localDirs[i], df);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (df.getAvailable() < minSpace)
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
* Start a new task.
|
|
* Start a new task.
|
|
* All exceptions are handled locally, so that we don't mess up the
|
|
* All exceptions are handled locally, so that we don't mess up the
|
|
* task tracker.
|
|
* task tracker.
|