|
@@ -18,9 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
-import java.io.OutputStream;
|
|
|
-import java.io.OutputStreamWriter;
|
|
|
-import java.io.Writer;
|
|
|
+import java.io.IOException;
|
|
|
import java.util.regex.Pattern;
|
|
|
import java.util.regex.Matcher;
|
|
|
|
|
@@ -28,22 +26,28 @@ import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
-import org.apache.hadoop.examples.WordCount;
|
|
|
+import org.apache.hadoop.examples.SleepJob;
|
|
|
import org.apache.hadoop.util.ProcfsBasedProcessTree;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
-import org.apache.hadoop.util.Tool;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
-import org.apache.hadoop.fs.Path;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
+/**
|
|
|
+ * Test class to verify memory management of tasks.
|
|
|
+ */
|
|
|
public class TestTaskTrackerMemoryManager extends TestCase {
|
|
|
|
|
|
- private static final Log LOG = LogFactory.getLog(TestTaskTrackerMemoryManager.class);
|
|
|
+ private static final Log LOG =
|
|
|
+ LogFactory.getLog(TestTaskTrackerMemoryManager.class);
|
|
|
private MiniDFSCluster miniDFSCluster;
|
|
|
private MiniMRCluster miniMRCluster;
|
|
|
|
|
|
+ private String taskOverLimitPatternString =
|
|
|
+ "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond memory-limits. "
|
|
|
+ + "Current usage : [0-9]*kB. Limit : %skB. Killing task.";
|
|
|
+
|
|
|
private void startCluster(JobConf conf) throws Exception {
|
|
|
miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
|
|
|
FileSystem fileSys = miniDFSCluster.getFileSystem();
|
|
@@ -61,55 +65,14 @@ public class TestTaskTrackerMemoryManager extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void runWordCount(JobConf conf) throws Exception {
|
|
|
- Path input = new Path("input.txt");
|
|
|
- Path output = new Path("output");
|
|
|
-
|
|
|
- OutputStream os = miniDFSCluster.getFileSystem().create(input);
|
|
|
- Writer wr = new OutputStreamWriter(os);
|
|
|
- wr.write("hello1\n");
|
|
|
- wr.write("hello2\n");
|
|
|
- wr.write("hello3\n");
|
|
|
- wr.write("hello4\n");
|
|
|
- wr.close();
|
|
|
-
|
|
|
- Tool WordCount = new WordCount();
|
|
|
- if (conf != null) {
|
|
|
- WordCount.setConf(conf);
|
|
|
- }
|
|
|
- ToolRunner.run(WordCount, new String[] { input.toString(),
|
|
|
- output.toString() });
|
|
|
+ private void runSleepJob(JobConf conf) throws Exception {
|
|
|
+ String[] args = { "-m", "3", "-r", "1", "-mt", "3000", "-rt", "1000" };
|
|
|
+ ToolRunner.run(conf, new SleepJob(), args);
|
|
|
}
|
|
|
|
|
|
- public void testNormalTaskAndLimitedTT() throws Exception {
|
|
|
- // Run the test only if memory management is enabled
|
|
|
-
|
|
|
- try {
|
|
|
- if (!ProcfsBasedProcessTree.isAvailable()) {
|
|
|
- LOG.info("Currently ProcessTree has only one implementation "
|
|
|
- + "ProcfsBasedProcessTree, which is not available on this "
|
|
|
- + "system. Not testing");
|
|
|
- return;
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.info(StringUtils.stringifyException(e));
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- Pattern diagMsgPattern = Pattern
|
|
|
- .compile("TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond "
|
|
|
- + "memory-limits. Current usage : [0-9]*kB. Limit : [0-9]*kB. Killing task.");
|
|
|
- Matcher mat = null;
|
|
|
-
|
|
|
- // Start cluster with proper configuration.
|
|
|
- JobConf fConf = new JobConf();
|
|
|
-
|
|
|
- fConf.setLong("mapred.tasktracker.tasks.maxmemory",
|
|
|
- Long.valueOf(10000000000L)); // Fairly large value for WordCount to succeed
|
|
|
- startCluster(fConf);
|
|
|
-
|
|
|
+ private void runAndCheckSuccessfulJob(JobConf conf)
|
|
|
+ throws IOException {
|
|
|
// Set up job.
|
|
|
- JobConf conf = new JobConf();
|
|
|
JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
|
|
|
conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
|
|
|
+ jt.getTrackerPort());
|
|
@@ -118,10 +81,14 @@ public class TestTaskTrackerMemoryManager extends TestCase {
|
|
|
+ nn.getNameNodeAddress().getHostName() + ":"
|
|
|
+ nn.getNameNodeAddress().getPort());
|
|
|
|
|
|
+ Pattern taskOverLimitPattern =
|
|
|
+ Pattern.compile(String.format(taskOverLimitPatternString, "[0-9]*"));
|
|
|
+ Matcher mat = null;
|
|
|
+
|
|
|
// Start the job.
|
|
|
boolean success = true;
|
|
|
try {
|
|
|
- runWordCount(conf);
|
|
|
+ runSleepJob(conf);
|
|
|
success = true;
|
|
|
} catch (Exception e) {
|
|
|
success = false;
|
|
@@ -130,8 +97,6 @@ public class TestTaskTrackerMemoryManager extends TestCase {
|
|
|
// Job has to succeed
|
|
|
assertTrue(success);
|
|
|
|
|
|
- // Alas, we don't have a way to get job id/Task completion events from
|
|
|
- // WordCount
|
|
|
JobClient jClient = new JobClient(conf);
|
|
|
JobStatus[] jStatus = jClient.getAllJobs();
|
|
|
JobStatus js = jStatus[0]; // Our only job
|
|
@@ -141,12 +106,12 @@ public class TestTaskTrackerMemoryManager extends TestCase {
|
|
|
TaskCompletionEvent[] taskComplEvents = rj.getTaskCompletionEvents(0);
|
|
|
|
|
|
for (TaskCompletionEvent tce : taskComplEvents) {
|
|
|
- String[] diagnostics = jClient.jobSubmitClient.getTaskDiagnostics(tce
|
|
|
- .getTaskAttemptId());
|
|
|
+ String[] diagnostics =
|
|
|
+ jClient.jobSubmitClient.getTaskDiagnostics(tce.getTaskAttemptId());
|
|
|
|
|
|
if (diagnostics != null) {
|
|
|
for (String str : diagnostics) {
|
|
|
- mat = diagMsgPattern.matcher(str);
|
|
|
+ mat = taskOverLimitPattern.matcher(str);
|
|
|
// The error pattern shouldn't be there in any TIP's diagnostics
|
|
|
assertFalse(mat.find());
|
|
|
}
|
|
@@ -154,34 +119,123 @@ public class TestTaskTrackerMemoryManager extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void testOOMTaskAndLimitedTT() throws Exception {
|
|
|
-
|
|
|
- // Run the test only if memory management is enabled
|
|
|
-
|
|
|
+ private boolean isProcfsBasedTreeAvailable() {
|
|
|
try {
|
|
|
if (!ProcfsBasedProcessTree.isAvailable()) {
|
|
|
LOG.info("Currently ProcessTree has only one implementation "
|
|
|
+ "ProcfsBasedProcessTree, which is not available on this "
|
|
|
+ "system. Not testing");
|
|
|
- return;
|
|
|
+ return false;
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
LOG.info(StringUtils.stringifyException(e));
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test for verifying that nothing is killed when memory management is
|
|
|
+ * disabled on the TT, even when the tasks run over their limits.
|
|
|
+ *
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ public void testTTLimitsDisabled()
|
|
|
+ throws Exception {
|
|
|
+ // Run the test only if memory management is enabled
|
|
|
+ if (!isProcfsBasedTreeAvailable()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ JobConf conf = new JobConf();
|
|
|
+ // Task-memory management disabled by default.
|
|
|
+ startCluster(conf);
|
|
|
+ long PER_TASK_LIMIT = 100L; // Doesn't matter how low.
|
|
|
+ conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
|
|
|
+ runAndCheckSuccessfulJob(conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test for verifying that tasks with no limits, with the cumulative usage
|
|
|
+ * still under TT's limits, succeed.
|
|
|
+ *
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ public void testTasksWithNoLimits()
|
|
|
+ throws Exception {
|
|
|
+ // Run the test only if memory management is enabled
|
|
|
+ if (!isProcfsBasedTreeAvailable()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Start cluster with proper configuration.
|
|
|
+ JobConf fConf = new JobConf();
|
|
|
+
|
|
|
+ // Fairly large value for sleepJob to succeed
|
|
|
+ fConf.setLong("mapred.tasktracker.tasks.maxmemory", 10000000000L);
|
|
|
+ startCluster(fConf);
|
|
|
+
|
|
|
+ // Set up job.
|
|
|
+ JobConf conf = new JobConf();
|
|
|
+ runAndCheckSuccessfulJob(conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test for verifying that tasks within limits, with the cumulative usage also
|
|
|
+ * under TT's limits succeed.
|
|
|
+ *
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ public void testTasksWithinLimits()
|
|
|
+ throws Exception {
|
|
|
+ // Run the test only if memory management is enabled
|
|
|
+ if (!isProcfsBasedTreeAvailable()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ long PER_TASK_LIMIT = 10000000000L; // Large so sleepjob goes through.
|
|
|
+ long TASK_TRACKER_LIMIT = 10000000000L; // Large so as to fit total usage
|
|
|
+
|
|
|
+ // Start cluster with proper configuration.
|
|
|
+ JobConf fConf = new JobConf();
|
|
|
+
|
|
|
+ // Fairly large value for sleepjob to succeed
|
|
|
+ fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
|
|
|
+ startCluster(fConf);
|
|
|
+
|
|
|
+ JobConf conf = new JobConf();
|
|
|
+ conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
|
|
|
+ runAndCheckSuccessfulJob(conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test for verifying that tasks that go beyond limits, though the cumulative
|
|
|
+ * usage is under TT's limits, get killed.
|
|
|
+ *
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ public void testTasksBeyondLimits()
|
|
|
+ throws Exception {
|
|
|
+
|
|
|
+ // Run the test only if memory management is enabled
|
|
|
+ if (!isProcfsBasedTreeAvailable()) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- long PER_TASK_LIMIT = 444; // Enough to kill off WordCount.
|
|
|
- Pattern diagMsgPattern = Pattern
|
|
|
- .compile("TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond "
|
|
|
- + "memory-limits. Current usage : [0-9]*kB. Limit : "
|
|
|
- + PER_TASK_LIMIT + "kB. Killing task.");
|
|
|
+ long PER_TASK_LIMIT = 444; // Low enough to kill off sleepJob tasks.
|
|
|
+ long TASK_TRACKER_LIMIT = 10000000000L; // Large so as to fit total usage
|
|
|
+ Pattern taskOverLimitPattern =
|
|
|
+ Pattern.compile(String.format(taskOverLimitPatternString, String
|
|
|
+ .valueOf(PER_TASK_LIMIT)));
|
|
|
Matcher mat = null;
|
|
|
|
|
|
// Start cluster with proper configuration.
|
|
|
JobConf fConf = new JobConf();
|
|
|
- fConf.setLong("mapred.tasktracker.tasks.maxmemory", Long.valueOf(100000));
|
|
|
- fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300));
|
|
|
- //very small value, so that no task escapes to successful completion.
|
|
|
+ fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
|
|
|
+
|
|
|
+ // very small value, so that no task escapes to successful completion.
|
|
|
+ fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
|
|
|
+ String.valueOf(300));
|
|
|
startCluster(fConf);
|
|
|
|
|
|
// Set up job.
|
|
@@ -198,7 +252,7 @@ public class TestTaskTrackerMemoryManager extends TestCase {
|
|
|
// Start the job.
|
|
|
boolean success = true;
|
|
|
try {
|
|
|
- runWordCount(conf);
|
|
|
+ runSleepJob(conf);
|
|
|
success = true;
|
|
|
} catch (Exception e) {
|
|
|
success = false;
|
|
@@ -207,8 +261,6 @@ public class TestTaskTrackerMemoryManager extends TestCase {
|
|
|
// Job has to fail
|
|
|
assertFalse(success);
|
|
|
|
|
|
- // Alas, we don't have a way to get job id/Task completion events from
|
|
|
- // WordCount
|
|
|
JobClient jClient = new JobClient(conf);
|
|
|
JobStatus[] jStatus = jClient.getAllJobs();
|
|
|
JobStatus js = jStatus[0]; // Our only job
|
|
@@ -222,18 +274,104 @@ public class TestTaskTrackerMemoryManager extends TestCase {
|
|
|
assert (tce.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED || tce
|
|
|
.getTaskStatus() == TaskCompletionEvent.Status.FAILED);
|
|
|
|
|
|
- String[] diagnostics = jClient.jobSubmitClient.getTaskDiagnostics(tce
|
|
|
- .getTaskAttemptId());
|
|
|
+ String[] diagnostics =
|
|
|
+ jClient.jobSubmitClient.getTaskDiagnostics(tce.getTaskAttemptId());
|
|
|
|
|
|
// Every task HAS to spit out the out-of-memory errors
|
|
|
assert (diagnostics != null);
|
|
|
|
|
|
for (String str : diagnostics) {
|
|
|
- mat = diagMsgPattern.matcher(str);
|
|
|
+ mat = taskOverLimitPattern.matcher(str);
|
|
|
// Every task HAS to spit out the out-of-memory errors in the same
|
|
|
// format. And these are the only diagnostic messages.
|
|
|
assertTrue(mat.find());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test for verifying that tasks causing cumulative usage to go beyond TT's
|
|
|
+ * limit get killed even though they all are under individual limits. Memory
|
|
|
+ * management for tasks with disabled task-limits also traverses the same
|
|
|
+ * code-path, so we don't need a separate testTaskLimitsDisabled.
|
|
|
+ *
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ public void testTasksCumulativelyExceedingTTLimits()
|
|
|
+ throws Exception {
|
|
|
+
|
|
|
+ // Run the test only if memory management is enabled
|
|
|
+ if (!isProcfsBasedTreeAvailable()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Large enough for SleepJob Tasks.
|
|
|
+ long PER_TASK_LIMIT = 100000000000L;
|
|
|
+ // Very Limited TT. All tasks will be killed.
|
|
|
+ long TASK_TRACKER_LIMIT = 100L;
|
|
|
+ Pattern taskOverLimitPattern =
|
|
|
+ Pattern.compile(String.format(taskOverLimitPatternString, String
|
|
|
+ .valueOf(PER_TASK_LIMIT)));
|
|
|
+ Pattern trackerOverLimitPattern =
|
|
|
+ Pattern.compile("Killing one of the least progress tasks - .*, as "
|
|
|
+ + "the cumulative memory usage of all the tasks on the TaskTracker"
|
|
|
+ + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
|
|
|
+ Matcher mat = null;
|
|
|
+
|
|
|
+ // Start cluster with proper configuration.
|
|
|
+ JobConf fConf = new JobConf();
|
|
|
+ fConf.setLong("mapred.tasktracker.tasks.maxmemory", TASK_TRACKER_LIMIT);
|
|
|
+ // very small value, so that no task escapes to successful completion.
|
|
|
+ fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
|
|
|
+ String.valueOf(300));
|
|
|
+
|
|
|
+ startCluster(fConf);
|
|
|
+
|
|
|
+ // Set up job.
|
|
|
+ JobConf conf = new JobConf();
|
|
|
+ conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
|
|
|
+ JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
|
|
|
+ conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
|
|
|
+ + jt.getTrackerPort());
|
|
|
+ NameNode nn = miniDFSCluster.getNameNode();
|
|
|
+ conf.set("fs.default.name", "hdfs://"
|
|
|
+ + nn.getNameNodeAddress().getHostName() + ":"
|
|
|
+ + nn.getNameNodeAddress().getPort());
|
|
|
+
|
|
|
+ JobClient jClient = new JobClient(conf);
|
|
|
+ SleepJob sleepJob = new SleepJob();
|
|
|
+ sleepJob.setConf(conf);
|
|
|
+ // Start the job
|
|
|
+ RunningJob job =
|
|
|
+ jClient.submitJob(sleepJob.setupJobConf(1, 1, 5000, 1, 1000, 1));
|
|
|
+ boolean TTOverFlowMsgPresent = false;
|
|
|
+ while (true) {
|
|
|
+ // Set-up tasks are the first to be launched.
|
|
|
+ TaskReport[] setUpReports = jt.getSetupTaskReports(job.getID());
|
|
|
+ for (TaskReport tr : setUpReports) {
|
|
|
+ String[] diag = tr.getDiagnostics();
|
|
|
+ for (String str : diag) {
|
|
|
+ mat = taskOverLimitPattern.matcher(str);
|
|
|
+ assertFalse(mat.find());
|
|
|
+ mat = trackerOverLimitPattern.matcher(str);
|
|
|
+ if (mat.find()) {
|
|
|
+ TTOverFlowMsgPresent = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (TTOverFlowMsgPresent) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // nothing
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // If it comes here without a test-timeout, it means there was a task that
|
|
|
+ // was killed because of crossing cumulative TT limit.
|
|
|
+
|
|
|
+ // Test succeeded, kill the job.
|
|
|
+ job.killJob();
|
|
|
+ }
|
|
|
+}
|