|
@@ -21,11 +21,13 @@ package org.apache.hadoop.mapred;
|
|
|
import java.io.IOException;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.examples.SleepJob;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -53,25 +55,27 @@ public class TestRecoveryManager {
|
|
|
private MiniMRCluster mr;
|
|
|
|
|
|
@Before
|
|
|
- public void setUp() {
|
|
|
- JobConf conf = new JobConf();
|
|
|
- try {
|
|
|
- fs = FileSystem.get(new Configuration());
|
|
|
- fs.delete(TEST_DIR, true);
|
|
|
- conf.set("mapred.jobtracker.job.history.block.size", "1024");
|
|
|
- conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
|
|
|
- mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
+ public void setUp() throws IOException {
|
|
|
+ fs = FileSystem.get(new Configuration());
|
|
|
+ fs.delete(TEST_DIR, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void startCluster() throws IOException {
|
|
|
+ startCluster(new JobConf());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void startCluster(JobConf conf) throws IOException {
|
|
|
+ mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
|
|
|
}
|
|
|
|
|
|
@After
|
|
|
public void tearDown() {
|
|
|
- ClusterStatus status = mr.getJobTrackerRunner().getJobTracker()
|
|
|
- .getClusterStatus(false);
|
|
|
- if (status.getJobTrackerState() == JobTracker.State.RUNNING) {
|
|
|
- mr.shutdown();
|
|
|
+ if (mr != null) {
|
|
|
+ ClusterStatus status = mr.getJobTrackerRunner().getJobTracker()
|
|
|
+ .getClusterStatus(false);
|
|
|
+ if (status.getJobTrackerState() == JobTracker.State.RUNNING) {
|
|
|
+ mr.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -87,6 +91,7 @@ public class TestRecoveryManager {
|
|
|
@Test(timeout=120000)
|
|
|
public void testJobTrackerRestartsWithMissingJobFile() throws Exception {
|
|
|
LOG.info("Testing jobtracker restart with faulty job");
|
|
|
+ startCluster();
|
|
|
String signalFile = new Path(TEST_DIR, "signal").toString();
|
|
|
|
|
|
JobConf job1 = mr.createJobConf();
|
|
@@ -168,6 +173,7 @@ public class TestRecoveryManager {
|
|
|
@Test(timeout=120000)
|
|
|
public void testJobResubmission() throws Exception {
|
|
|
LOG.info("Testing Job Resubmission");
|
|
|
+ startCluster();
|
|
|
String signalFile = new Path(TEST_DIR, "signal").toString();
|
|
|
|
|
|
// make sure that the jobtracker is in recovery mode
|
|
@@ -216,6 +222,73 @@ public class TestRecoveryManager {
|
|
|
Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
|
|
|
}
|
|
|
|
|
|
+ public static class TestJobTrackerInstrumentation extends JobTrackerInstrumentation {
|
|
|
+ static CountDownLatch finalizeCall = new CountDownLatch(1);
|
|
|
+
|
|
|
+ public TestJobTrackerInstrumentation(JobTracker jt, JobConf conf) {
|
|
|
+ super(jt, conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void finalizeJob(JobConf conf, JobID id) {
|
|
|
+ if (finalizeCall.getCount() == 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ finalizeCall.countDown();
|
|
|
+ throw new IllegalStateException("Controlled error finalizing job");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testJobTrackerRestartBeforeJobFinalization() throws Exception {
|
|
|
+ LOG.info("Testing Job Resubmission");
|
|
|
+
|
|
|
+ JobConf conf = new JobConf();
|
|
|
+ // make sure that the jobtracker is in recovery mode
|
|
|
+ conf.setBoolean("mapred.jobtracker.restart.recover", true);
|
|
|
+
|
|
|
+ // use a test JobTrackerInstrumentation implementation to shut down
|
|
|
+ // the jobtracker after the tasks have all completed, but
|
|
|
+ // before the job is finalized and check that it can be recovered correctly
|
|
|
+ conf.setClass("mapred.jobtracker.instrumentation", TestJobTrackerInstrumentation.class,
|
|
|
+ JobTrackerInstrumentation.class);
|
|
|
+
|
|
|
+ startCluster(conf);
|
|
|
+
|
|
|
+ JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
|
|
|
+
|
|
|
+ SleepJob job = new SleepJob();
|
|
|
+ job.setConf(mr.createJobConf());
|
|
|
+ JobConf job1 = job.setupJobConf(1, 0, 1, 1, 1, 1);
|
|
|
+ JobClient jc = new JobClient(job1);
|
|
|
+ RunningJob rJob1 = jc.submitJob(job1);
|
|
|
+ LOG.info("Submitted first job " + rJob1.getID());
|
|
|
+
|
|
|
+ TestJobTrackerInstrumentation.finalizeCall.await();
|
|
|
+
|
|
|
+ // kill the jobtracker
|
|
|
+ LOG.info("Stopping jobtracker");
|
|
|
+ mr.stopJobTracker();
|
|
|
+
|
|
|
+ // start the jobtracker
|
|
|
+ LOG.info("Starting jobtracker");
|
|
|
+ mr.startJobTracker();
|
|
|
+ UtilsForTests.waitForJobTracker(jc);
|
|
|
+
|
|
|
+ jobtracker = mr.getJobTrackerRunner().getJobTracker();
|
|
|
+
|
|
|
+ // assert that job is recovered by the jobtracker
|
|
|
+ Assert.assertEquals("Resubmission failed ", 1,
|
|
|
+ jobtracker.getAllJobs().length);
|
|
|
+
|
|
|
+ // wait for job 1 to complete
|
|
|
+ JobInProgress jip = jobtracker.getJob(rJob1.getID());
|
|
|
+ while (!jip.isComplete()) {
|
|
|
+ LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
|
|
|
+ UtilsForTests.waitFor(100);
|
|
|
+ }
|
|
|
+ Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Tests the {@link JobTracker.RecoveryManager} against the exceptions thrown
|
|
|
* during recovery. It does the following :
|
|
@@ -231,6 +304,7 @@ public class TestRecoveryManager {
|
|
|
@Test(timeout=120000)
|
|
|
public void testJobTrackerRestartWithBadJobs() throws Exception {
|
|
|
LOG.info("Testing recovery-manager");
|
|
|
+ startCluster();
|
|
|
String signalFile = new Path(TEST_DIR, "signal").toString();
|
|
|
// make sure that the jobtracker is in recovery mode
|
|
|
mr.getJobTrackerConf()
|
|
@@ -362,6 +436,7 @@ public class TestRecoveryManager {
|
|
|
@Test(timeout=120000)
|
|
|
public void testRestartCount() throws Exception {
|
|
|
LOG.info("Testing Job Restart Count");
|
|
|
+ startCluster();
|
|
|
String signalFile = new Path(TEST_DIR, "signal").toString();
|
|
|
// make sure that the jobtracker is in recovery mode
|
|
|
mr.getJobTrackerConf()
|