|
@@ -24,6 +24,8 @@ import java.util.List;
|
|
|
|
|
|
import junit.framework.Assert;
|
|
import junit.framework.Assert;
|
|
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -38,6 +40,9 @@ import org.junit.Test;
|
|
*/
|
|
*/
|
|
public class TestMapReduceJobControl extends HadoopTestCase {
|
|
public class TestMapReduceJobControl extends HadoopTestCase {
|
|
|
|
|
|
|
|
+ public static final Log LOG =
|
|
|
|
+ LogFactory.getLog(TestMapReduceJobControl.class.getName());
|
|
|
|
+
|
|
static Path rootDataDir = new Path(
|
|
static Path rootDataDir = new Path(
|
|
System.getProperty("test.build.data", "."), "TestData");
|
|
System.getProperty("test.build.data", "."), "TestData");
|
|
static Path indir = new Path(rootDataDir, "indir");
|
|
static Path indir = new Path(rootDataDir, "indir");
|
|
@@ -117,6 +122,7 @@ public class TestMapReduceJobControl extends HadoopTestCase {
|
|
}
|
|
}
|
|
|
|
|
|
public void testJobControlWithFailJob() throws Exception {
|
|
public void testJobControlWithFailJob() throws Exception {
|
|
|
|
+ LOG.info("Starting testJobControlWithFailJob");
|
|
Configuration conf = createJobConf();
|
|
Configuration conf = createJobConf();
|
|
|
|
|
|
cleanupData(conf);
|
|
cleanupData(conf);
|
|
@@ -139,6 +145,8 @@ public class TestMapReduceJobControl extends HadoopTestCase {
|
|
}
|
|
}
|
|
|
|
|
|
public void testJobControlWithKillJob() throws Exception {
|
|
public void testJobControlWithKillJob() throws Exception {
|
|
|
|
+ LOG.info("Starting testJobControlWithKillJob");
|
|
|
|
+
|
|
Configuration conf = createJobConf();
|
|
Configuration conf = createJobConf();
|
|
cleanupData(conf);
|
|
cleanupData(conf);
|
|
Job job1 = MapReduceTestUtil.createKillJob(conf, outdir_1, indir);
|
|
Job job1 = MapReduceTestUtil.createKillJob(conf, outdir_1, indir);
|
|
@@ -175,6 +183,8 @@ public class TestMapReduceJobControl extends HadoopTestCase {
|
|
}
|
|
}
|
|
|
|
|
|
public void testJobControl() throws Exception {
|
|
public void testJobControl() throws Exception {
|
|
|
|
+ LOG.info("Starting testJobControl");
|
|
|
|
+
|
|
Configuration conf = createJobConf();
|
|
Configuration conf = createJobConf();
|
|
|
|
|
|
cleanupData(conf);
|
|
cleanupData(conf);
|
|
@@ -193,10 +203,12 @@ public class TestMapReduceJobControl extends HadoopTestCase {
|
|
|
|
|
|
@Test(timeout = 30000)
|
|
@Test(timeout = 30000)
|
|
public void testControlledJob() throws Exception {
|
|
public void testControlledJob() throws Exception {
|
|
|
|
+ LOG.info("Starting testControlledJob");
|
|
|
|
+
|
|
Configuration conf = createJobConf();
|
|
Configuration conf = createJobConf();
|
|
cleanupData(conf);
|
|
cleanupData(conf);
|
|
Job job1 = MapReduceTestUtil.createCopyJob(conf, outdir_1, indir);
|
|
Job job1 = MapReduceTestUtil.createCopyJob(conf, outdir_1, indir);
|
|
- createDependencies(conf, job1);
|
|
|
|
|
|
+ JobControl theControl = createDependencies(conf, job1);
|
|
while (cjob1.getJobState() != ControlledJob.State.RUNNING) {
|
|
while (cjob1.getJobState() != ControlledJob.State.RUNNING) {
|
|
try {
|
|
try {
|
|
Thread.sleep(100);
|
|
Thread.sleep(100);
|
|
@@ -205,5 +217,10 @@ public class TestMapReduceJobControl extends HadoopTestCase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Assert.assertNotNull(cjob1.getMapredJobId());
|
|
Assert.assertNotNull(cjob1.getMapredJobId());
|
|
|
|
+
|
|
|
|
+ // wait till all the jobs complete
|
|
|
|
+ waitTillAllFinished(theControl);
|
|
|
|
+ assertEquals("Some jobs failed", 0, theControl.getFailedJobList().size());
|
|
|
|
+ theControl.stop();
|
|
}
|
|
}
|
|
}
|
|
}
|