|
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.mapred.TaskTracker;
|
|
|
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
|
|
|
import org.apache.hadoop.util.ProcfsBasedProcessTree;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
|
|
|
/**
|
|
|
* Manages memory usage of tasks running under this TT. Kills any task-trees
|
|
@@ -163,76 +164,84 @@ class TaskMemoryManagerThread extends Thread {
|
|
|
// Now, check memory usage and kill any overflowing tasks
|
|
|
for (Iterator<Map.Entry<TaskAttemptID, ProcessTreeInfo>> it = processTreeInfoMap
|
|
|
.entrySet().iterator(); it.hasNext();) {
|
|
|
-
|
|
|
Map.Entry<TaskAttemptID, ProcessTreeInfo> entry = it.next();
|
|
|
TaskAttemptID tid = entry.getKey();
|
|
|
ProcessTreeInfo ptInfo = entry.getValue();
|
|
|
- String pId = ptInfo.getPID();
|
|
|
-
|
|
|
- // Initialize any uninitialized processTrees
|
|
|
- if (pId == null) {
|
|
|
- // get pid from pid-file
|
|
|
- pId = getPid(ptInfo.pidFile);
|
|
|
- if (pId != null) {
|
|
|
- // PID will be null, either if the pid file is yet to be created
|
|
|
- // or if the tip is finished and we removed pidFile, but the TIP
|
|
|
- // itself is still retained in runningTasks till successful
|
|
|
- // transmission to JT
|
|
|
-
|
|
|
- // create process tree object
|
|
|
- ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(pId);
|
|
|
- LOG.debug("Tracking ProcessTree " + pId + " for the first time");
|
|
|
-
|
|
|
- ptInfo.setPid(pId);
|
|
|
- ptInfo.setProcessTree(pt);
|
|
|
- processTreeInfoMap.put(tid, ptInfo);
|
|
|
+ try {
|
|
|
+ String pId = ptInfo.getPID();
|
|
|
+
|
|
|
+ // Initialize any uninitialized processTrees
|
|
|
+ if (pId == null) {
|
|
|
+ // get pid from pid-file
|
|
|
+ pId = getPid(ptInfo.pidFile);
|
|
|
+ if (pId != null) {
|
|
|
+ // PID will be null, either if the pid file is yet to be created
|
|
|
+ // or if the tip is finished and we removed pidFile, but the TIP
|
|
|
+ // itself is still retained in runningTasks till successful
|
|
|
+ // transmission to JT
|
|
|
+
|
|
|
+ // create process tree object
|
|
|
+ ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(pId);
|
|
|
+ LOG.debug("Tracking ProcessTree " + pId + " for the first time");
|
|
|
+
|
|
|
+ ptInfo.setPid(pId);
|
|
|
+ ptInfo.setProcessTree(pt);
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- // End of initializing any uninitialized processTrees
|
|
|
+ // End of initializing any uninitialized processTrees
|
|
|
|
|
|
- if (pId == null) {
|
|
|
- continue; // processTree cannot be tracked
|
|
|
- }
|
|
|
+ if (pId == null) {
|
|
|
+ continue; // processTree cannot be tracked
|
|
|
+ }
|
|
|
|
|
|
- LOG.debug("Constructing ProcessTree for : PID = " + pId + " TID = "
|
|
|
- + tid);
|
|
|
- ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
|
|
|
- pTree = pTree.getProcessTree(); // get the updated process-tree
|
|
|
- ptInfo.setProcessTree(pTree); // update ptInfo with proces-tree of
|
|
|
- // updated state
|
|
|
- long currentMemUsage = pTree.getCumulativeVmem();
|
|
|
- long limit = ptInfo.getMemLimit();
|
|
|
- LOG.info("Memory usage of ProcessTree " + pId + " :" + currentMemUsage
|
|
|
- + "bytes. Limit : " + limit + "bytes");
|
|
|
-
|
|
|
- if (limit > taskTracker.getLimitMaxVMemPerTask()) {
|
|
|
- // TODO: With monitoring enabled and no scheduling based on
|
|
|
- // memory,users can seriously hijack the system by specifying memory
|
|
|
- // requirements well above the cluster wide limit. Ideally these jobs
|
|
|
- // should have been rejected by JT/scheduler. Because we can't do
|
|
|
- // that, in the minimum we should fail the tasks and hence the job.
|
|
|
- LOG.warn("Task " + tid
|
|
|
- + " 's maxVmemPerTask is greater than TT's limitMaxVmPerTask");
|
|
|
- }
|
|
|
+ LOG.debug("Constructing ProcessTree for : PID = " + pId + " TID = "
|
|
|
+ + tid);
|
|
|
+ ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
|
|
|
+ pTree = pTree.getProcessTree(); // get the updated process-tree
|
|
|
+ ptInfo.setProcessTree(pTree); // update ptInfo with proces-tree of
|
|
|
+ // updated state
|
|
|
+ long currentMemUsage = pTree.getCumulativeVmem();
|
|
|
+ long limit = ptInfo.getMemLimit();
|
|
|
+ LOG.info("Memory usage of ProcessTree " + pId + " :"
|
|
|
+ + currentMemUsage + "bytes. Limit : " + limit + "bytes");
|
|
|
+
|
|
|
+ if (limit > taskTracker.getLimitMaxVMemPerTask()) {
|
|
|
+ // TODO: With monitoring enabled and no scheduling based on
|
|
|
+ // memory,users can seriously hijack the system by specifying memory
|
|
|
+ // requirements well above the cluster wide limit. Ideally these
|
|
|
+ // jobs
|
|
|
+ // should have been rejected by JT/scheduler. Because we can't do
|
|
|
+ // that, in the minimum we should fail the tasks and hence the job.
|
|
|
+ LOG.warn("Task " + tid
|
|
|
+ + " 's maxVmemPerTask is greater than TT's limitMaxVmPerTask");
|
|
|
+ }
|
|
|
|
|
|
- if (limit != JobConf.DISABLED_MEMORY_LIMIT
|
|
|
- && currentMemUsage > limit) {
|
|
|
- // Task (the root process) is still alive and overflowing memory.
|
|
|
- // Clean up.
|
|
|
- String msg = "TaskTree [pid=" + pId + ",tipID=" + tid
|
|
|
- + "] is running beyond memory-limits. Current usage : "
|
|
|
- + currentMemUsage + "bytes. Limit : " + limit + "bytes. Killing task.";
|
|
|
- LOG.warn(msg);
|
|
|
- taskTracker.cleanUpOverMemoryTask(tid, true, msg);
|
|
|
-
|
|
|
- // Now destroy the ProcessTree, remove it from monitoring map.
|
|
|
- pTree.destroy();
|
|
|
- it.remove();
|
|
|
- LOG.info("Removed ProcessTree with root " + pId);
|
|
|
- } else {
|
|
|
- // Accounting the total memory in usage for all tasks that are still
|
|
|
- // alive and within limits.
|
|
|
- memoryStillInUsage += currentMemUsage;
|
|
|
+ if (limit != JobConf.DISABLED_MEMORY_LIMIT
|
|
|
+ && currentMemUsage > limit) {
|
|
|
+ // Task (the root process) is still alive and overflowing memory.
|
|
|
+ // Clean up.
|
|
|
+ String msg =
|
|
|
+ "TaskTree [pid=" + pId + ",tipID=" + tid
|
|
|
+ + "] is running beyond memory-limits. Current usage : "
|
|
|
+ + currentMemUsage + "bytes. Limit : " + limit
|
|
|
+ + "bytes. Killing task.";
|
|
|
+ LOG.warn(msg);
|
|
|
+ taskTracker.cleanUpOverMemoryTask(tid, true, msg);
|
|
|
+
|
|
|
+ // Now destroy the ProcessTree, remove it from monitoring map.
|
|
|
+ pTree.destroy();
|
|
|
+ it.remove();
|
|
|
+ LOG.info("Removed ProcessTree with root " + pId);
|
|
|
+ } else {
|
|
|
+ // Accounting the total memory in usage for all tasks that are still
|
|
|
+ // alive and within limits.
|
|
|
+ memoryStillInUsage += currentMemUsage;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Log the exception and proceed to the next task.
|
|
|
+ LOG.warn("Uncaught exception in TaskMemoryManager "
|
|
|
+ + "while managing memory of " + tid + " : "
|
|
|
+ + StringUtils.stringifyException(e));
|
|
|
}
|
|
|
}
|
|
|
|