|
@@ -84,8 +84,7 @@ public class TestRecoveryManager {
|
|
|
* - restarts the jobtracker
|
|
|
* - checks if the jobtraker starts normally
|
|
|
*/
|
|
|
- @Test
|
|
|
- @Ignore
|
|
|
+ @Test(timeout=120000)
|
|
|
public void testJobTrackerRestartsWithMissingJobFile() throws Exception {
|
|
|
LOG.info("Testing jobtracker restart with faulty job");
|
|
|
String signalFile = new Path(TEST_DIR, "signal").toString();
|
|
@@ -111,7 +110,7 @@ public class TestRecoveryManager {
|
|
|
new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 30, 0,
|
|
|
"test-recovery-manager", signalFile, signalFile);
|
|
|
|
|
|
- // submit the faulty job
|
|
|
+ // submit another job
|
|
|
RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
|
|
|
LOG.info("Submitted job " + rJob2.getID());
|
|
|
|
|
@@ -129,7 +128,7 @@ public class TestRecoveryManager {
|
|
|
Path jobFile =
|
|
|
new Path(sysDir, rJob1.getID().toString() + "/" + JobTracker.JOB_INFO_FILE);
|
|
|
LOG.info("Deleting job token file : " + jobFile.toString());
|
|
|
- fs.delete(jobFile, false); // delete the job.xml file
|
|
|
+ Assert.assertTrue(fs.delete(jobFile, false)); // delete the job.xml file
|
|
|
|
|
|
// create the job.xml file with 1 bytes
|
|
|
FSDataOutputStream out = fs.create(jobFile);
|
|
@@ -142,12 +141,22 @@ public class TestRecoveryManager {
|
|
|
// start the jobtracker
|
|
|
LOG.info("Starting jobtracker");
|
|
|
mr.startJobTracker();
|
|
|
- ClusterStatus status =
|
|
|
- mr.getJobTrackerRunner().getJobTracker().getClusterStatus(false);
|
|
|
+ JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
|
|
|
+ ClusterStatus status = jobtracker.getClusterStatus(false);
|
|
|
|
|
|
// check if the jobtracker came up or not
|
|
|
Assert.assertEquals("JobTracker crashed!",
|
|
|
JobTracker.State.RUNNING, status.getJobTrackerState());
|
|
|
+
|
|
|
+ // wait for job 2 to complete
|
|
|
+ JobInProgress jip = jobtracker.getJob(rJob2.getID());
|
|
|
+ while (!jip.isComplete()) {
|
|
|
+ LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
|
|
|
+ // Signaling Map task to complete
|
|
|
+ fs.create(new Path(TEST_DIR, "signal"));
|
|
|
+ UtilsForTests.waitFor(100);
|
|
|
+ }
|
|
|
+ Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -156,8 +165,7 @@ public class TestRecoveryManager {
|
|
|
* - kills the jobtracker
|
|
|
* - checks if the jobtraker starts normally and job is recovered while
|
|
|
*/
|
|
|
- @Test
|
|
|
- @Ignore
|
|
|
+ @Test(timeout=120000)
|
|
|
public void testJobResubmission() throws Exception {
|
|
|
LOG.info("Testing Job Resubmission");
|
|
|
String signalFile = new Path(TEST_DIR, "signal").toString();
|
|
@@ -196,6 +204,8 @@ public class TestRecoveryManager {
|
|
|
// assert that job is recovered by the jobtracker
|
|
|
Assert.assertEquals("Resubmission failed ", 1,
|
|
|
jobtracker.getAllJobs().length);
|
|
|
+
|
|
|
+ // wait for job 1 to complete
|
|
|
JobInProgress jip = jobtracker.getJob(rJob1.getID());
|
|
|
while (!jip.isComplete()) {
|
|
|
LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
|
|
@@ -218,8 +228,7 @@ public class TestRecoveryManager {
|
|
|
* - checks if the jobtraker starts normally and job#2 is recovered while
|
|
|
* job#1 is failed.
|
|
|
*/
|
|
|
- @Test
|
|
|
- @Ignore
|
|
|
+ @Test(timeout=120000)
|
|
|
public void testJobTrackerRestartWithBadJobs() throws Exception {
|
|
|
LOG.info("Testing recovery-manager");
|
|
|
String signalFile = new Path(TEST_DIR, "signal").toString();
|
|
@@ -234,7 +243,7 @@ public class TestRecoveryManager {
|
|
|
job1.setJobPriority(JobPriority.HIGH);
|
|
|
|
|
|
UtilsForTests.configureWaitingJobConf(job1,
|
|
|
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 30, 0,
|
|
|
+ new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 30, 0,
|
|
|
"test-recovery-manager", signalFile, signalFile);
|
|
|
|
|
|
// submit the faulty job
|
|
@@ -252,7 +261,7 @@ public class TestRecoveryManager {
|
|
|
|
|
|
String signalFile1 = new Path(TEST_DIR, "signal1").toString();
|
|
|
UtilsForTests.configureWaitingJobConf(job2,
|
|
|
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 20, 0,
|
|
|
+ new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 20, 0,
|
|
|
"test-recovery-manager", signalFile1, signalFile1);
|
|
|
|
|
|
// submit the job
|
|
@@ -273,7 +282,7 @@ public class TestRecoveryManager {
|
|
|
UserGroupInformation.createUserForTesting("abc", new String[]{"users"});
|
|
|
|
|
|
UtilsForTests.configureWaitingJobConf(job3,
|
|
|
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 1, 0,
|
|
|
+ new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 1, 0,
|
|
|
"test-recovery-manager", signalFile, signalFile);
|
|
|
|
|
|
// submit the job
|
|
@@ -326,6 +335,15 @@ public class TestRecoveryManager {
|
|
|
|
|
|
status = jobtracker.getJobStatus(rJob3.getID());
|
|
|
Assert.assertNull("Job should be missing because of ACL changed", status);
|
|
|
+
|
|
|
+ // wait for job 2 to complete
|
|
|
+ while (!jip.isComplete()) {
|
|
|
+ LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
|
|
|
+ // Signaling Map task to complete
|
|
|
+ fs.create(new Path(TEST_DIR, "signal1"));
|
|
|
+ UtilsForTests.waitFor(100);
|
|
|
+ }
|
|
|
+ Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -341,7 +359,7 @@ public class TestRecoveryManager {
|
|
|
* - garble the jobtracker.info file and restart he jobtracker, the
|
|
|
* jobtracker should crash.
|
|
|
*/
|
|
|
- @Test
|
|
|
+ @Test(timeout=120000)
|
|
|
public void testRestartCount() throws Exception {
|
|
|
LOG.info("Testing Job Restart Count");
|
|
|
String signalFile = new Path(TEST_DIR, "signal").toString();
|
|
@@ -356,7 +374,7 @@ public class TestRecoveryManager {
|
|
|
job1.setJobPriority(JobPriority.HIGH);
|
|
|
|
|
|
UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"),
|
|
|
- new Path(TEST_DIR, "output3"), 30, 0, "test-restart", signalFile,
|
|
|
+ new Path(TEST_DIR, "output7"), 30, 0, "test-restart", signalFile,
|
|
|
signalFile);
|
|
|
|
|
|
// submit the faulty job
|
|
@@ -396,7 +414,7 @@ public class TestRecoveryManager {
|
|
|
JobConf job2 = mr.createJobConf();
|
|
|
|
|
|
UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"),
|
|
|
- new Path(TEST_DIR, "output7"), 50, 0, "test-restart-manager",
|
|
|
+ new Path(TEST_DIR, "output8"), 50, 0, "test-restart-manager",
|
|
|
signalFile, signalFile);
|
|
|
|
|
|
// submit a new job
|
|
@@ -430,7 +448,7 @@ public class TestRecoveryManager {
|
|
|
* Test if the jobtracker waits for the info file to be created before
|
|
|
* starting.
|
|
|
*/
|
|
|
- @Test
|
|
|
+ @Test(timeout=120000)
|
|
|
public void testJobTrackerInfoCreation() throws Exception {
|
|
|
LOG.info("Testing jobtracker.info file");
|
|
|
MiniDFSCluster dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
|