|
@@ -24,6 +24,7 @@ import java.io.FileInputStream;
|
|
|
import java.io.FileNotFoundException;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
+import java.io.InputStreamReader;
|
|
|
import java.io.StringReader;
|
|
|
import java.net.URI;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
@@ -980,6 +981,124 @@ public class TestMRJobs {
|
|
|
_testDistributedCache(remoteJobJarPath.toUri().toString());
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout = 120000)
|
|
|
+ public void testThreadDumpOnTaskTimeout() throws IOException,
|
|
|
+ InterruptedException, ClassNotFoundException {
|
|
|
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
|
|
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
|
|
|
+ + " not found. Not running test.");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ final SleepJob sleepJob = new SleepJob();
|
|
|
+ final JobConf sleepConf = new JobConf(mrCluster.getConfig());
|
|
|
+ sleepConf.setLong(MRJobConfig.TASK_TIMEOUT, 3 * 1000L);
|
|
|
+ sleepConf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
|
|
|
+ sleepJob.setConf(sleepConf);
|
|
|
+ if (this instanceof TestUberAM) {
|
|
|
+ sleepConf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS,
|
|
|
+ 30 * 1000);
|
|
|
+ }
|
|
|
+ // sleep for 10 seconds to trigger a kill with thread dump
|
|
|
+ final Job job = sleepJob.createJob(1, 0, 10 * 60 * 1000L, 1, 0L, 0);
|
|
|
+ job.setJarByClass(SleepJob.class);
|
|
|
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
|
|
|
+ job.waitForCompletion(true);
|
|
|
+ final JobId jobId = TypeConverter.toYarn(job.getJobID());
|
|
|
+ final ApplicationId appID = jobId.getAppId();
|
|
|
+ int pollElapsed = 0;
|
|
|
+ while (true) {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ pollElapsed += 1000;
|
|
|
+ if (TERMINAL_RM_APP_STATES.contains(mrCluster.getResourceManager()
|
|
|
+ .getRMContext().getRMApps().get(appID).getState())) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (pollElapsed >= 60000) {
|
|
|
+ LOG.warn("application did not reach terminal state within 60 seconds");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Job finished, verify logs
|
|
|
+ //
|
|
|
+
|
|
|
+ final String appIdStr = appID.toString();
|
|
|
+ final String appIdSuffix = appIdStr.substring("application_".length(),
|
|
|
+ appIdStr.length());
|
|
|
+ final String containerGlob = "container_" + appIdSuffix + "_*_*";
|
|
|
+ final String syslogGlob = appIdStr
|
|
|
+ + Path.SEPARATOR + containerGlob
|
|
|
+ + Path.SEPARATOR + TaskLog.LogName.SYSLOG;
|
|
|
+ int numAppMasters = 0;
|
|
|
+ int numMapTasks = 0;
|
|
|
+
|
|
|
+ for (int i = 0; i < NUM_NODE_MGRS; i++) {
|
|
|
+ final Configuration nmConf = mrCluster.getNodeManager(i).getConfig();
|
|
|
+ for (String logDir :
|
|
|
+ nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)) {
|
|
|
+ final Path absSyslogGlob =
|
|
|
+ new Path(logDir + Path.SEPARATOR + syslogGlob);
|
|
|
+ LOG.info("Checking for glob: " + absSyslogGlob);
|
|
|
+ for (FileStatus syslog : localFs.globStatus(absSyslogGlob)) {
|
|
|
+ boolean foundAppMaster = false;
|
|
|
+ boolean foundThreadDump = false;
|
|
|
+
|
|
|
+ // Determine the container type
|
|
|
+ final BufferedReader syslogReader = new BufferedReader(
|
|
|
+ new InputStreamReader(localFs.open(syslog.getPath())));
|
|
|
+ try {
|
|
|
+ for (String line; (line = syslogReader.readLine()) != null; ) {
|
|
|
+ if (line.contains(MRAppMaster.class.getName())) {
|
|
|
+ foundAppMaster = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ syslogReader.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check for thread dump in stdout
|
|
|
+ final Path stdoutPath = new Path(syslog.getPath().getParent(),
|
|
|
+ TaskLog.LogName.STDOUT.toString());
|
|
|
+ final BufferedReader stdoutReader = new BufferedReader(
|
|
|
+ new InputStreamReader(localFs.open(stdoutPath)));
|
|
|
+ try {
|
|
|
+ for (String line; (line = stdoutReader.readLine()) != null; ) {
|
|
|
+ if (line.contains("Full thread dump")) {
|
|
|
+ foundThreadDump = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ stdoutReader.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (foundAppMaster) {
|
|
|
+ numAppMasters++;
|
|
|
+ if (this instanceof TestUberAM) {
|
|
|
+ Assert.assertTrue("No thread dump", foundThreadDump);
|
|
|
+ } else {
|
|
|
+ Assert.assertFalse("Unexpected thread dump", foundThreadDump);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ numMapTasks++;
|
|
|
+ Assert.assertTrue("No thread dump", foundThreadDump);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Make sure we checked non-empty set
|
|
|
+ //
|
|
|
+ Assert.assertEquals("No AppMaster log found!", 1, numAppMasters);
|
|
|
+ if (sleepConf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false)) {
|
|
|
+ Assert.assertSame("MapTask log with uber found!", 0, numMapTasks);
|
|
|
+ } else {
|
|
|
+ Assert.assertSame("No MapTask log found!", 1, numMapTasks);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private Path createTempFile(String filename, String contents)
|
|
|
throws IOException {
|
|
|
Path path = new Path(TEST_ROOT_DIR, filename);
|