|
@@ -34,11 +34,12 @@ import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
|
|
|
-import org.apache.hadoop.mapreduce.MRConfig;
|
|
|
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
|
|
|
+import org.apache.hadoop.mapreduce.MRConfig;
|
|
|
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
* Test whether the {@link RecoveryManager} is able to tolerate job-recovery
|
|
|
* failures and the jobtracker is able to tolerate {@link RecoveryManager}
|
|
@@ -65,16 +66,19 @@ public class TestRecoveryManager extends TestCase {
|
|
|
public void testJobTracker() throws Exception {
|
|
|
LOG.info("Testing jobtracker restart with faulty job");
|
|
|
String signalFile = new Path(TEST_DIR, "signal").toString();
|
|
|
+ System
|
|
|
+ .setProperty("hadoop.log.dir", System.getProperty("java.io.tmpdir"));
|
|
|
JobConf conf = new JobConf();
|
|
|
|
|
|
FileSystem fs = FileSystem.get(new Configuration());
|
|
|
fs.delete(TEST_DIR, true); // cleanup
|
|
|
|
|
|
- conf.set(JTConfig.JT_JOBHISTORY_BLOCK_SIZE, "1024");
|
|
|
+ conf.set("mapred.jobtracker.job.history.block.size", "1024");
|
|
|
+ conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
|
|
|
|
|
|
MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
|
|
|
|
|
|
- JobConf job1 = mr.createJobConf();
|
|
|
+ JobConf job1 = mr.createJobConf();
|
|
|
|
|
|
UtilsForTests.configureWaitingJobConf(job1,
|
|
|
new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0,
|
|
@@ -88,7 +92,7 @@ public class TestRecoveryManager extends TestCase {
|
|
|
LOG.info("Waiting for job " + rJob1.getID() + " to be 20% done");
|
|
|
UtilsForTests.waitFor(100);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
JobConf job2 = mr.createJobConf();
|
|
|
|
|
|
UtilsForTests.configureWaitingJobConf(job2,
|
|
@@ -137,7 +141,7 @@ public class TestRecoveryManager extends TestCase {
|
|
|
assertEquals("No of recovered jobs not correct",
|
|
|
1, mr.getJobTrackerRunner().getJobTracker().
|
|
|
recoveryManager.getRecovered());
|
|
|
-
|
|
|
+ fs.create(new Path(TEST_DIR, "signal"));
|
|
|
mr.shutdown();
|
|
|
}
|
|
|
|
|
@@ -156,6 +160,8 @@ public class TestRecoveryManager extends TestCase {
|
|
|
public void testRecoveryManager() throws Exception {
|
|
|
LOG.info("Testing recovery-manager");
|
|
|
String signalFile = new Path(TEST_DIR, "signal").toString();
|
|
|
+ System
|
|
|
+ .setProperty("hadoop.log.dir", System.getProperty("java.io.tmpdir"));
|
|
|
|
|
|
// clean up
|
|
|
FileSystem fs = FileSystem.get(new Configuration());
|
|
@@ -243,6 +249,8 @@ public class TestRecoveryManager extends TestCase {
|
|
|
|
|
|
mr.getJobTrackerConf().setBoolean(MRConfig.MR_ACLS_ENABLED, true);
|
|
|
|
|
|
+ mr.getJobTrackerConf().set("mapred.queue.names", "default");
|
|
|
+
|
|
|
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
|
|
|
mr.getJobTrackerConf().set(toFullPropertyName(
|
|
|
"default", QueueACL.SUBMIT_JOB.getAclName()), ugi.getUserName());
|
|
@@ -273,7 +281,8 @@ public class TestRecoveryManager extends TestCase {
|
|
|
|
|
|
status = jobtracker.getJobStatus(rJob3.getID());
|
|
|
assertNull("Job should be missing", status);
|
|
|
-
|
|
|
+ fs.create(new Path(TEST_DIR, "signal"));
|
|
|
+ fs.create(new Path(TEST_DIR, "signal1"));
|
|
|
mr.shutdown();
|
|
|
}
|
|
|
|