|
@@ -52,14 +52,20 @@ public class TestKillSubProcesses extends TestCase {
|
|
|
private static volatile Log LOG = LogFactory
|
|
|
.getLog(TestKillSubProcesses.class);
|
|
|
|
|
|
- private static String TEST_ROOT_DIR = new File(System.getProperty(
|
|
|
- "test.build.data", "/tmp"), "killjob").toURI().toString().replace(' ', '+');
|
|
|
+ private static String BASE_TEST_ROOT_DIR = new File(System.getProperty(
|
|
|
+ "test.build.data", "/tmp")).getAbsolutePath();
|
|
|
+ private static String TEST_ROOT_DIR = BASE_TEST_ROOT_DIR + Path.SEPARATOR
|
|
|
+ + "killSubProcesses";
|
|
|
+
|
|
|
+ private static Path scriptDir = new Path(TEST_ROOT_DIR, "script");
|
|
|
+ private static String scriptDirName = scriptDir.toUri().getPath();
|
|
|
+ private static Path signalFile = new Path(TEST_ROOT_DIR
|
|
|
+ + "/script/signalFile");
|
|
|
|
|
|
private static JobClient jobClient = null;
|
|
|
|
|
|
static MiniMRCluster mr = null;
|
|
|
- private static Path scriptDir = null;
|
|
|
- private static String scriptDirName = null;
|
|
|
+
|
|
|
private static String pid = null;
|
|
|
|
|
|
// number of levels in the subtree of subprocesses of map task
|
|
@@ -74,7 +80,6 @@ public class TestKillSubProcesses extends TestCase {
|
|
|
conf.setJobName("testkilljobsubprocesses");
|
|
|
conf.setMapperClass(KillingMapperWithChildren.class);
|
|
|
|
|
|
- scriptDir = new Path(TEST_ROOT_DIR , "script");
|
|
|
RunningJob job = runJobAndSetProcessHandle(jt, conf);
|
|
|
|
|
|
// kill the job now
|
|
@@ -107,9 +112,8 @@ public class TestKillSubProcesses extends TestCase {
|
|
|
// check if all the subprocesses are killed properly.
|
|
|
conf.setMaxMapAttempts(1);
|
|
|
|
|
|
- scriptDir = new Path(TEST_ROOT_DIR + "/script");
|
|
|
RunningJob job = runJobAndSetProcessHandle(jt, conf);
|
|
|
- signalTask(TEST_ROOT_DIR + "/failjob/signalFile", conf);
|
|
|
+ signalTask(signalFile.toString(), conf);
|
|
|
validateKillingSubprocesses(job, conf);
|
|
|
// Checking the Job status
|
|
|
assertEquals(job.getJobState(), JobStatus.FAILED);
|
|
@@ -125,9 +129,8 @@ public class TestKillSubProcesses extends TestCase {
|
|
|
conf.setJobName("testsucceedjobsubprocesses");
|
|
|
conf.setMapperClass(MapperWithChildren.class);
|
|
|
|
|
|
- scriptDir = new Path(TEST_ROOT_DIR + "/script");
|
|
|
RunningJob job = runJobAndSetProcessHandle(jt, conf);
|
|
|
- signalTask(TEST_ROOT_DIR + "/succeedjob/signalFile", conf);
|
|
|
+ signalTask(signalFile.toString(), conf);
|
|
|
validateKillingSubprocesses(job, conf);
|
|
|
// Checking the Job status
|
|
|
assertEquals(job.getJobState(), JobStatus.SUCCEEDED);
|
|
@@ -149,7 +152,6 @@ public class TestKillSubProcesses extends TestCase {
|
|
|
}
|
|
|
|
|
|
pid = null;
|
|
|
- scriptDirName = scriptDir.toUri().getPath();
|
|
|
jobClient = new JobClient(conf);
|
|
|
|
|
|
// get the taskAttemptID of the map task and use it to get the pid
|
|
@@ -287,7 +289,9 @@ public class TestKillSubProcesses extends TestCase {
|
|
|
|
|
|
conf.setNumMapTasks(1);
|
|
|
conf.setNumReduceTasks(0);
|
|
|
- conf.set("test.build.data", TEST_ROOT_DIR);
|
|
|
+
|
|
|
+ conf.set("mapred.child.java.opts", conf.get("mapred.child.java.opts") +
|
|
|
+ " -Dtest.build.data=" + BASE_TEST_ROOT_DIR);
|
|
|
|
|
|
return UtilsForTests.runJob(conf, inDir, outDir);
|
|
|
}
|
|
@@ -345,18 +349,17 @@ public class TestKillSubProcesses extends TestCase {
|
|
|
private static void runChildren(JobConf conf) throws IOException {
|
|
|
if (ProcessTree.isSetsidAvailable) {
|
|
|
FileSystem fs = FileSystem.getLocal(conf);
|
|
|
- TEST_ROOT_DIR = new Path(conf.get("test.build.data")).toUri().getPath();
|
|
|
- scriptDir = new Path(TEST_ROOT_DIR + "/script");
|
|
|
+
|
|
|
if(fs.exists(scriptDir)){
|
|
|
fs.delete(scriptDir, true);
|
|
|
}
|
|
|
// create shell script
|
|
|
Random rm = new Random();
|
|
|
- Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt()
|
|
|
+ Path scriptPath = new Path(scriptDirName, "_shellScript_" + rm.nextInt()
|
|
|
+ ".sh");
|
|
|
String shellScript = scriptPath.toString();
|
|
|
String script =
|
|
|
- "echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" +
|
|
|
+ "echo $$ > " + scriptDirName + "/childPidFile" + "$1\n" +
|
|
|
"echo hello\n" +
|
|
|
"trap 'echo got SIGTERM' 15 \n" +
|
|
|
"if [ $1 != 0 ]\nthen\n" +
|
|
@@ -374,17 +377,17 @@ public class TestKillSubProcesses extends TestCase {
|
|
|
Runtime.getRuntime()
|
|
|
.exec(shellScript + " " + numLevelsOfSubProcesses);
|
|
|
|
|
|
- String childPid = UtilsForTests.getPidFromPidFile(scriptDir
|
|
|
+ String childPid = UtilsForTests.getPidFromPidFile(scriptDirName
|
|
|
+ "/childPidFile" + 0);
|
|
|
while (childPid == null) {
|
|
|
- LOG.warn(scriptDir + "/childPidFile" + 0 + " is null; Sleeping...");
|
|
|
+ LOG.warn(scriptDirName + "/childPidFile" + 0 + " is null; Sleeping...");
|
|
|
try {
|
|
|
Thread.sleep(500);
|
|
|
} catch (InterruptedException ie) {
|
|
|
LOG.warn("sleep is interrupted:" + ie);
|
|
|
break;
|
|
|
}
|
|
|
- childPid = UtilsForTests.getPidFromPidFile(scriptDir
|
|
|
+ childPid = UtilsForTests.getPidFromPidFile(scriptDirName
|
|
|
+ "/childPidFile" + 0);
|
|
|
}
|
|
|
}
|
|
@@ -396,9 +399,9 @@ public class TestKillSubProcesses extends TestCase {
|
|
|
static class MapperWithChildren extends MapReduceBase implements
|
|
|
Mapper<WritableComparable, Writable, WritableComparable, Writable> {
|
|
|
FileSystem fs = null;
|
|
|
- final Path signal = new Path(TEST_ROOT_DIR + "/script/signalFile");
|
|
|
public void configure(JobConf conf) {
|
|
|
try {
|
|
|
+ fs = FileSystem.getLocal(conf);
|
|
|
runChildren(conf);
|
|
|
} catch (Exception e) {
|
|
|
LOG.warn("Exception in configure: " +
|
|
@@ -410,18 +413,16 @@ public class TestKillSubProcesses extends TestCase {
|
|
|
public void map(WritableComparable key, Writable value,
|
|
|
OutputCollector<WritableComparable, Writable> out, Reporter reporter)
|
|
|
throws IOException {
|
|
|
- if (fs != null) {
|
|
|
- while (!fs.exists(signal)) {// wait for signal file creation
|
|
|
- try {
|
|
|
- reporter.progress();
|
|
|
- synchronized (this) {
|
|
|
- this.wait(1000);
|
|
|
- }
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- System.out.println("Interrupted while the map was waiting for "
|
|
|
- + " the signal.");
|
|
|
- break;
|
|
|
+ while (!fs.exists(signalFile)) {// wait for signal file creation
|
|
|
+ try {
|
|
|
+ reporter.progress();
|
|
|
+ synchronized (this) {
|
|
|
+ this.wait(1000);
|
|
|
}
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ System.out.println("Interrupted while the map was waiting for "
|
|
|
+ + " the signal.");
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -460,18 +461,16 @@ public class TestKillSubProcesses extends TestCase {
|
|
|
public void map(WritableComparable key, Writable value,
|
|
|
OutputCollector<WritableComparable, Writable> out, Reporter reporter)
|
|
|
throws IOException {
|
|
|
- if (fs != null) {
|
|
|
- while (!fs.exists(signal)) {// wait for signal file creation
|
|
|
- try {
|
|
|
- reporter.progress();
|
|
|
- synchronized (this) {
|
|
|
- this.wait(1000);
|
|
|
- }
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- System.out.println("Interrupted while the map was waiting for "
|
|
|
- + " the signal.");
|
|
|
- break;
|
|
|
+ while (!fs.exists(signalFile)) {// wait for signal file creation
|
|
|
+ try {
|
|
|
+ reporter.progress();
|
|
|
+ synchronized (this) {
|
|
|
+ this.wait(1000);
|
|
|
}
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ System.out.println("Interrupted while the map was waiting for "
|
|
|
+ + " the signal.");
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
throw new RuntimeException("failing map");
|