|
@@ -0,0 +1,449 @@
|
|
|
|
+/**
|
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
|
+ * distributed with this work for additional information
|
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
|
+ *
|
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
+ *
|
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
|
+ * limitations under the License.
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+package org.apache.hadoop.mapred;
|
|
|
|
+
|
|
|
|
+import java.io.DataOutputStream;
|
|
|
|
+import java.io.File;
|
|
|
|
+import java.io.IOException;
|
|
|
|
+import java.util.Random;
|
|
|
|
+import java.util.Iterator;
|
|
|
|
+
|
|
|
|
+import junit.framework.TestCase;
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
|
+import org.apache.hadoop.io.Writable;
|
|
|
|
+import org.apache.hadoop.io.WritableComparable;
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
|
+import org.apache.hadoop.util.ProcessTree;
|
|
|
|
+import org.apache.hadoop.util.Shell;
|
|
|
|
+
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * A JUnit test to test Kill Job that has tasks with children and checks if the
|
|
|
|
+ * children(subprocesses of java task) are also killed when a task is killed.
|
|
|
|
+ */
|
|
|
|
+public class TestKillSubProcesses extends TestCase {
|
|
|
|
+
|
|
|
|
+ private static volatile Log LOG = LogFactory
|
|
|
|
+ .getLog(TestKillSubProcesses.class);
|
|
|
|
+
|
|
|
|
+ private static String TEST_ROOT_DIR = new File(System.getProperty(
|
|
|
|
+ "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
|
|
|
|
+
|
|
|
|
+ private static JobClient jobClient = null;
|
|
|
|
+
|
|
|
|
+ private static MiniMRCluster mr = null;
|
|
|
|
+ private static Path scriptDir = null;
|
|
|
|
+ private static String scriptDirName = null;
|
|
|
|
+ private static String pid = null;
|
|
|
|
+
|
|
|
|
+ // number of levels in the subtree of subprocesses of map task
|
|
|
|
+ private static int numLevelsOfSubProcesses = 4;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Runs a job, kills the job and verifies if the map task and its
|
|
|
|
+ * subprocesses are also killed properly or not.
|
|
|
|
+ */
|
|
|
|
+ private static void runKillingJobAndValidate(JobTracker jt, JobConf conf) throws IOException {
|
|
|
|
+
|
|
|
|
+ conf.setJobName("testkilljobsubprocesses");
|
|
|
|
+ conf.setMapperClass(KillingMapperWithChildren.class);
|
|
|
|
+
|
|
|
|
+ scriptDir = new Path(TEST_ROOT_DIR + "/script");
|
|
|
|
+ RunningJob job = runJobAndSetProcessHandle(jt, conf);
|
|
|
|
+
|
|
|
|
+ // kill the job now
|
|
|
|
+ job.killJob();
|
|
|
|
+
|
|
|
|
+ while (job.cleanupProgress() == 0.0f) {
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(100);
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ LOG.warn("sleep is interrupted:" + ie);
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ validateKillingSubprocesses(job, conf);
|
|
|
|
+ // Checking the Job status
|
|
|
|
+ assertEquals(job.getJobState(), JobStatus.KILLED);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Runs a job that will fail and verifies if the subprocesses of failed map
|
|
|
|
+ * task are killed properly or not.
|
|
|
|
+ */
|
|
|
|
+ private static void runFailingJobAndValidate(JobTracker jt, JobConf conf) throws IOException {
|
|
|
|
+
|
|
|
|
+ conf.setJobName("testfailjobsubprocesses");
|
|
|
|
+ conf.setMapperClass(FailingMapperWithChildren.class);
|
|
|
|
+
|
|
|
|
+ // We don't want to run the failing map task 4 times. So we run it once and
|
|
|
|
+ // check if all the subprocesses are killed properly.
|
|
|
|
+ conf.setMaxMapAttempts(1);
|
|
|
|
+
|
|
|
|
+ scriptDir = new Path(TEST_ROOT_DIR + "/script");
|
|
|
|
+ RunningJob job = runJobAndSetProcessHandle(jt, conf);
|
|
|
|
+ signalTask(TEST_ROOT_DIR + "/failjob/signalFile", conf);
|
|
|
|
+ validateKillingSubprocesses(job, conf);
|
|
|
|
+ // Checking the Job status
|
|
|
|
+ assertEquals(job.getJobState(), JobStatus.FAILED);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Runs a job that will succeed and verifies if the subprocesses of succeeded
|
|
|
|
+ * map task are killed properly or not.
|
|
|
|
+ */
|
|
|
|
+ private static void runSuccessfulJobAndValidate(JobTracker jt, JobConf conf)
|
|
|
|
+ throws IOException {
|
|
|
|
+
|
|
|
|
+ conf.setJobName("testsucceedjobsubprocesses");
|
|
|
|
+ conf.setMapperClass(MapperWithChildren.class);
|
|
|
|
+
|
|
|
|
+ scriptDir = new Path(TEST_ROOT_DIR + "/script");
|
|
|
|
+ RunningJob job = runJobAndSetProcessHandle(jt, conf);
|
|
|
|
+ signalTask(TEST_ROOT_DIR + "/succeedjob/signalFile", conf);
|
|
|
|
+ validateKillingSubprocesses(job, conf);
|
|
|
|
+ // Checking the Job status
|
|
|
|
+ assertEquals(job.getJobState(), JobStatus.SUCCEEDED);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Runs the given job and saves the pid of map task.
|
|
|
|
+ * Also checks if the subprocesses of map task are alive.
|
|
|
|
+ */
|
|
|
|
+ private static RunningJob runJobAndSetProcessHandle(JobTracker jt, JobConf conf)
|
|
|
|
+ throws IOException {
|
|
|
|
+ RunningJob job = runJob(conf);
|
|
|
|
+ while (job.getJobState() != JobStatus.RUNNING) {
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(100);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ pid = null;
|
|
|
|
+ scriptDirName = scriptDir.toUri().getPath();
|
|
|
|
+ jobClient = new JobClient(conf);
|
|
|
|
+
|
|
|
|
+ // get the taskAttemptID of the map task and use it to get the pid
|
|
|
|
+ // of map task
|
|
|
|
+ TaskReport[] mapReports = jobClient.getMapTaskReports(job.getID());
|
|
|
|
+
|
|
|
|
+ JobInProgress jip = jt.getJob(job.getID());
|
|
|
|
+ for(TaskReport tr : mapReports) {
|
|
|
|
+ TaskInProgress tip = jip.getTaskInProgress(tr.getTaskID());
|
|
|
|
+
|
|
|
|
+ // for this tip, get active tasks of all attempts
|
|
|
|
+ while(tip.getActiveTasks().size() == 0) {
|
|
|
|
+ //wait till the activeTasks Tree is built
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(500);
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ LOG.warn("sleep is interrupted:" + ie);
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for (Iterator<TaskAttemptID> it =
|
|
|
|
+ tip.getActiveTasks().keySet().iterator(); it.hasNext();) {
|
|
|
|
+ TaskAttemptID id = it.next();
|
|
|
|
+ LOG.info("taskAttemptID of map task is " + id);
|
|
|
|
+
|
|
|
|
+ while(pid == null) {
|
|
|
|
+ pid = mr.getTaskTrackerRunner(0).getTaskTracker().getPid(id);
|
|
|
|
+ if (pid == null) {
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(500);
|
|
|
|
+ } catch(InterruptedException e) {}
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ LOG.info("pid of map task is " + pid);
|
|
|
|
+
|
|
|
|
+ // Checking if the map task is alive
|
|
|
|
+ assertTrue(ProcessTree.isAlive(pid));
|
|
|
|
+ LOG.info("The map task is alive before Job completion, as expected.");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Checking if the descendant processes of map task are alive
|
|
|
|
+ if(ProcessTree.isSetsidAvailable) {
|
|
|
|
+ String childPid = UtilsForTests.getPidFromPidFile(
|
|
|
|
+ scriptDirName + "/childPidFile" + 0);
|
|
|
|
+ while(childPid == null) {
|
|
|
|
+ LOG.warn(scriptDirName + "/childPidFile" + 0 + " is null; Sleeping...");
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(500);
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ LOG.warn("sleep is interrupted:" + ie);
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ childPid = UtilsForTests.getPidFromPidFile(
|
|
|
|
+ scriptDirName + "/childPidFile" + 0);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // As childPidFile0(leaf process in the subtree of processes with
|
|
|
|
+ // map task as root) is created, all other child pid files should
|
|
|
|
+ // have been created already(See the script for details).
|
|
|
|
+ // Now check if the descendants of map task are alive.
|
|
|
|
+ for(int i=0; i <= numLevelsOfSubProcesses; i++) {
|
|
|
|
+ childPid = UtilsForTests.getPidFromPidFile(
|
|
|
|
+ scriptDirName + "/childPidFile" + i);
|
|
|
|
+ LOG.info("pid of the descendant process at level " + i +
|
|
|
|
+ "in the subtree of processes(with the map task as the root)" +
|
|
|
|
+ " is " + childPid);
|
|
|
|
+ assertTrue("Unexpected: The subprocess at level " + i +
|
|
|
|
+ " in the subtree is not alive before Job completion",
|
|
|
|
+ ProcessTree.isAlive(childPid));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return job;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Verifies if the subprocesses of the map task are killed properly.
|
|
|
|
+ */
|
|
|
|
+ private static void validateKillingSubprocesses(RunningJob job, JobConf conf)
|
|
|
|
+ throws IOException {
|
|
|
|
+ // wait till the the job finishes
|
|
|
|
+ while (!job.isComplete()) {
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(500);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Checking if the map task got killed or not
|
|
|
|
+ assertTrue(!ProcessTree.isAlive(pid));
|
|
|
|
+ LOG.info("The map task is not alive after Job is completed, as expected.");
|
|
|
|
+
|
|
|
|
+ // Checking if the descendant processes of map task are killed properly
|
|
|
|
+ if(ProcessTree.isSetsidAvailable) {
|
|
|
|
+ for(int i=0; i <= numLevelsOfSubProcesses; i++) {
|
|
|
|
+ String childPid = UtilsForTests.getPidFromPidFile(
|
|
|
|
+ scriptDirName + "/childPidFile" + i);
|
|
|
|
+ LOG.info("pid of the descendant process at level " + i +
|
|
|
|
+ "in the subtree of processes(with the map task as the root)" +
|
|
|
|
+ " is " + childPid);
|
|
|
|
+ assertTrue("Unexpected: The subprocess at level " + i +
|
|
|
|
+ " in the subtree is alive after Job completion",
|
|
|
|
+ !ProcessTree.isAlive(childPid));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ FileSystem fs = FileSystem.get(conf);
|
|
|
|
+ if(fs.exists(scriptDir)) {
|
|
|
|
+ fs.delete(scriptDir, true);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static RunningJob runJob(JobConf conf) throws IOException {
|
|
|
|
+
|
|
|
|
+ final Path inDir = new Path(TEST_ROOT_DIR + "/killjob/input");
|
|
|
|
+ final Path outDir = new Path(TEST_ROOT_DIR + "/killjob/output");
|
|
|
|
+
|
|
|
|
+ FileSystem fs = FileSystem.get(conf);
|
|
|
|
+ if(fs.exists(scriptDir)) {
|
|
|
|
+ fs.delete(scriptDir, true);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ conf.setNumMapTasks(1);
|
|
|
|
+ conf.setNumReduceTasks(0);
|
|
|
|
+ conf.set("test.build.data", TEST_ROOT_DIR);
|
|
|
|
+
|
|
|
|
+ return UtilsForTests.runJob(conf, inDir, outDir);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void testJobKillFailAndSucceed() throws IOException {
|
|
|
|
+ if (Shell.WINDOWS) {
|
|
|
|
+ System.out.println(
|
|
|
|
+ "setsid doesn't work on WINDOWS as expected. Not testing");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ JobConf conf=null;
|
|
|
|
+ try {
|
|
|
|
+ mr = new MiniMRCluster(1, "file:///", 1);
|
|
|
|
+
|
|
|
|
+ // run the TCs
|
|
|
|
+ conf = mr.createJobConf();
|
|
|
|
+ JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
|
|
|
|
+ runKillingJobAndValidate(jt, conf);
|
|
|
|
+ runFailingJobAndValidate(jt, conf);
|
|
|
|
+ runSuccessfulJobAndValidate(jt, conf);
|
|
|
|
+ } finally {
|
|
|
|
+ if (mr != null) {
|
|
|
|
+ mr.shutdown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Creates signal file
|
|
|
|
+ */
|
|
|
|
+ private static void signalTask(String signalFile, JobConf conf) {
|
|
|
|
+ try {
|
|
|
|
+ FileSystem fs = FileSystem.get(conf);
|
|
|
|
+ fs.createNewFile(new Path(signalFile));
|
|
|
|
+ } catch(IOException e) {
|
|
|
|
+ LOG.warn("Unable to create signal file. " + e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Runs a recursive shell script to create a chain of subprocesses
|
|
|
|
+ */
|
|
|
|
+ private static void runChildren(JobConf conf) throws IOException {
|
|
|
|
+ if (ProcessTree.isSetsidAvailable) {
|
|
|
|
+ FileSystem fs = FileSystem.get(conf);
|
|
|
|
+ TEST_ROOT_DIR = new Path(conf.get("test.build.data")).toUri().getPath();
|
|
|
|
+ scriptDir = new Path(TEST_ROOT_DIR + "/script");
|
|
|
|
+
|
|
|
|
+ // create shell script
|
|
|
|
+ Random rm = new Random();
|
|
|
|
+ Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt()
|
|
|
|
+ + ".sh");
|
|
|
|
+ String shellScript = scriptPath.toString();
|
|
|
|
+ String script =
|
|
|
|
+ "echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" +
|
|
|
|
+ "echo hello\n" +
|
|
|
|
+ "if [ $1 != 0 ]\nthen\n" +
|
|
|
|
+ " sh " + shellScript + " $(($1-1))\n" +
|
|
|
|
+ "else\n" +
|
|
|
|
+ " while true\n do\n" +
|
|
|
|
+ " sleep 2\n" +
|
|
|
|
+ " done\n" +
|
|
|
|
+ "fi";
|
|
|
|
+ DataOutputStream file = fs.create(scriptPath);
|
|
|
|
+ file.writeBytes(script);
|
|
|
|
+ file.close();
|
|
|
|
+
|
|
|
|
+ LOG.info("Calling script from map task of failjob : " + shellScript);
|
|
|
|
+ Runtime.getRuntime()
|
|
|
|
+ .exec(shellScript + " " + numLevelsOfSubProcesses);
|
|
|
|
+
|
|
|
|
+ String childPid = UtilsForTests.getPidFromPidFile(scriptDir
|
|
|
|
+ + "/childPidFile" + 0);
|
|
|
|
+ while (childPid == null) {
|
|
|
|
+ LOG.warn(scriptDir + "/childPidFile" + 0 + " is null; Sleeping...");
|
|
|
|
+ try {
|
|
|
|
+ Thread.sleep(500);
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ LOG.warn("sleep is interrupted:" + ie);
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ childPid = UtilsForTests.getPidFromPidFile(scriptDir
|
|
|
|
+ + "/childPidFile" + 0);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Mapper that starts children
|
|
|
|
+ */
|
|
|
|
+ static class MapperWithChildren extends MapReduceBase implements
|
|
|
|
+ Mapper<WritableComparable, Writable, WritableComparable, Writable> {
|
|
|
|
+ FileSystem fs = null;
|
|
|
|
+ final Path signal = new Path(TEST_ROOT_DIR + "/script/signalFile");
|
|
|
|
+ public void configure(JobConf conf) {
|
|
|
|
+ try {
|
|
|
|
+ runChildren(conf);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ LOG.warn("Exception in configure: " +
|
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Mapper waits for the signal(signal is the existence of a file)
|
|
|
|
+ public void map(WritableComparable key, Writable value,
|
|
|
|
+ OutputCollector<WritableComparable, Writable> out, Reporter reporter)
|
|
|
|
+ throws IOException {
|
|
|
|
+ if (fs != null) {
|
|
|
|
+ while (!fs.exists(signal)) {// wait for signal file creation
|
|
|
|
+ try {
|
|
|
|
+ reporter.progress();
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ this.wait(1000);
|
|
|
|
+ }
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ System.out.println("Interrupted while the map was waiting for "
|
|
|
|
+ + " the signal.");
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Mapper that waits till it gets killed.
|
|
|
|
+ */
|
|
|
|
+ static class KillingMapperWithChildren extends MapperWithChildren {
|
|
|
|
+ public void configure(JobConf conf) {
|
|
|
|
+ super.configure(conf);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void map(WritableComparable key, Writable value,
|
|
|
|
+ OutputCollector<WritableComparable, Writable> out, Reporter reporter)
|
|
|
|
+ throws IOException {
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ while(true) {//just wait till kill happens
|
|
|
|
+ Thread.sleep(1000);
|
|
|
|
+ }
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ LOG.warn("Exception in KillMapperWithChild.map:" + e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Mapper that fails when recieves a signal. Signal is existence of a file.
|
|
|
|
+ */
|
|
|
|
+ static class FailingMapperWithChildren extends MapperWithChildren {
|
|
|
|
+ public void configure(JobConf conf) {
|
|
|
|
+ super.configure(conf);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void map(WritableComparable key, Writable value,
|
|
|
|
+ OutputCollector<WritableComparable, Writable> out, Reporter reporter)
|
|
|
|
+ throws IOException {
|
|
|
|
+ if (fs != null) {
|
|
|
|
+ while (!fs.exists(signal)) {// wait for signal file creation
|
|
|
|
+ try {
|
|
|
|
+ reporter.progress();
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ this.wait(1000);
|
|
|
|
+ }
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ System.out.println("Interrupted while the map was waiting for "
|
|
|
|
+ + " the signal.");
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ throw new RuntimeException("failing map");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|