|
@@ -20,9 +20,7 @@ package org.apache.hadoop.mapred;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
-
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
-import junit.framework.TestCase;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -31,12 +29,16 @@ import org.apache.hadoop.examples.SleepJob;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
|
|
|
import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
|
|
|
import org.apache.hadoop.mapred.QueueManager.QueueACL;
|
|
|
-import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
-import org.junit.*;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
|
|
|
/**
|
|
|
* Test whether the {@link RecoveryManager} is able to tolerate job-recovery
|
|
@@ -51,7 +53,7 @@ public class TestRecoveryManager {
|
|
|
new Path(System.getProperty("test.build.data", "/tmp"),
|
|
|
"test-recovery-manager");
|
|
|
private FileSystem fs;
|
|
|
- private JobConf conf;
|
|
|
+ private MiniDFSCluster dfs;
|
|
|
private MiniMRCluster mr;
|
|
|
|
|
|
@Before
|
|
@@ -70,11 +72,24 @@ public class TestRecoveryManager {
|
|
|
|
|
|
@After
|
|
|
public void tearDown() {
|
|
|
- if (mr != null) {
|
|
|
- ClusterStatus status = mr.getJobTrackerRunner().getJobTracker()
|
|
|
- .getClusterStatus(false);
|
|
|
- if (status.getJobTrackerState() == JobTracker.State.RUNNING) {
|
|
|
- mr.shutdown();
|
|
|
+ try {
|
|
|
+ if (mr != null) {
|
|
|
+ ClusterStatus status = mr.getJobTrackerRunner().getJobTracker()
|
|
|
+ .getClusterStatus(false);
|
|
|
+ if (status.getJobTrackerState() == JobTracker.State.RUNNING) {
|
|
|
+ mr.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ mr = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ if (dfs != null) {
|
|
|
+ dfs.shutdown();
|
|
|
+ dfs = null;
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ dfs = null;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -526,7 +541,7 @@ public class TestRecoveryManager {
|
|
|
@Test(timeout=120000)
|
|
|
public void testJobTrackerInfoCreation() throws Exception {
|
|
|
LOG.info("Testing jobtracker.info file");
|
|
|
- MiniDFSCluster dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
|
|
|
+ dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
|
|
|
String namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
|
|
|
+ (dfs.getFileSystem()).getUri().getPort();
|
|
|
// shut down the data nodes
|
|
@@ -575,4 +590,91 @@ public class TestRecoveryManager {
|
|
|
Assert.assertFalse("JobTracker failed to create info files with datanodes!",
|
|
|
failed);
|
|
|
}
|
|
|
+
|
|
|
+ static void mkdirWithPerms(FileSystem fs, String dir, short mode) throws IOException {
|
|
|
+ Path p = new Path(dir);
|
|
|
+ fs.mkdirs(p);
|
|
|
+ fs.setPermission(p, new FsPermission(mode));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=120000)
|
|
|
+ public void testJobResubmissionAsDifferentUser() throws Exception {
|
|
|
+ LOG.info("Testing Job Resubmission as a different user to the jobtracker");
|
|
|
+
|
|
|
+ final Path HDFS_TEST_DIR = new Path("/tmp");
|
|
|
+
|
|
|
+ JobConf conf = new JobConf();
|
|
|
+
|
|
|
+ dfs = new MiniDFSCluster(conf, 1, true, null);
|
|
|
+ fs = dfs.getFileSystem();
|
|
|
+
|
|
|
+ conf.set("mapreduce.jobtracker.staging.root.dir", "/user");
|
|
|
+ conf.set("mapred.system.dir", "/mapred");
|
|
|
+ String mapredSysDir = conf.get("mapred.system.dir");
|
|
|
+ mkdirWithPerms(fs, mapredSysDir, (short)0700);
|
|
|
+ fs.setOwner(new Path(mapredSysDir),
|
|
|
+ UserGroupInformation.getCurrentUser().getUserName(), "mrgroup");
|
|
|
+
|
|
|
+ mkdirWithPerms(fs, "/user", (short)0777);
|
|
|
+ mkdirWithPerms(fs, "/mapred", (short)0777);
|
|
|
+ mkdirWithPerms(fs, "/tmp", (short)0777);
|
|
|
+
|
|
|
+ mr =
|
|
|
+ new MiniMRCluster(
|
|
|
+ 1, dfs.getFileSystem().getUri().toString(), 1, null, null, conf);
|
|
|
+
|
|
|
+ String signalFile = new Path(HDFS_TEST_DIR, "signal").toString();
|
|
|
+
|
|
|
+ // make sure that the jobtracker is in recovery mode
|
|
|
+ mr.getJobTrackerConf()
|
|
|
+ .setBoolean("mapred.jobtracker.restart.recover", true);
|
|
|
+
|
|
|
+ JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
|
|
|
+
|
|
|
+ final JobConf job1 = mr.createJobConf();
|
|
|
+ UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"),
|
|
|
+ new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
|
|
|
+ signalFile);
|
|
|
+
|
|
|
+ UserGroupInformation ugi =
|
|
|
+ UserGroupInformation.createUserForTesting("bob", new String[]{"users"});
|
|
|
+ job1.setUser(ugi.getUserName());
|
|
|
+
|
|
|
+ JobClient jc = new JobClient(job1);
|
|
|
+ RunningJob rJob1 = ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
|
|
|
+ public RunningJob run() throws IOException {
|
|
|
+ JobClient jc = new JobClient(job1);
|
|
|
+ return jc.submitJob(job1);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ LOG.info("Submitted first job " + rJob1.getID());
|
|
|
+
|
|
|
+ while (rJob1.mapProgress() < 0.5f) {
|
|
|
+ LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
|
|
|
+ UtilsForTests.waitFor(100);
|
|
|
+ }
|
|
|
+
|
|
|
+ // kill the jobtracker
|
|
|
+ LOG.info("Stopping jobtracker");
|
|
|
+ mr.stopJobTracker();
|
|
|
+
|
|
|
+ // start the jobtracker
|
|
|
+ LOG.info("Starting jobtracker");
|
|
|
+ mr.startJobTracker();
|
|
|
+ UtilsForTests.waitForJobTracker(jc);
|
|
|
+
|
|
|
+ jobtracker = mr.getJobTrackerRunner().getJobTracker();
|
|
|
+ // assert that job is recovered by the jobtracker
|
|
|
+ Assert.assertEquals("Resubmission failed ", 1, jobtracker.getAllJobs().length);
|
|
|
+ JobInProgress jip = jobtracker.getJob(rJob1.getID());
|
|
|
+
|
|
|
+ // Signaling Map task to complete
|
|
|
+ fs.create(new Path(HDFS_TEST_DIR, "signal"));
|
|
|
+ while (!jip.isComplete()) {
|
|
|
+ LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
|
|
|
+ UtilsForTests.waitFor(100);
|
|
|
+ }
|
|
|
+ rJob1 = jc.getJob(rJob1.getID());
|
|
|
+ Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
|
|
|
+ }
|
|
|
}
|